Skip to content

Commit

Permalink
colexec: add "recursive" merging to external sort
Browse files Browse the repository at this point in the history
Previously, external sorter was merging all partitions at once (i.e.
there was a single merger). This is problematic because each partition
uses some amount of RAM for its buffer, we want to make sure that all
partitions together do not exceed the memory limit.

Now this is addressed by splitting previous "merging" stage into three:
1. new "repeated merging" stage is now responsible for merging all
current partitions and spilling new partitions to disk until only one is
left. This might be performed while we're still consuming the input.
2. new "final merging" stage that sets up an emitter that can merge all
the remaining partitions. This occurs only when the input has been fully
consumed and we can merge all the partitions at once, without having to
spill to disk.
3. new " emitting" stage that simply emits the output.

Release note: None
  • Loading branch information
yuzefovich committed Feb 20, 2020
1 parent dd98a30 commit 3429c96
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 69 deletions.
14 changes: 14 additions & 0 deletions pkg/sql/colexec/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,20 @@ func (a *Allocator) Used() int64 {

// Clear clears up the memory account of the allocator.
func (a *Allocator) Clear() {
// TODO(yuzefovich): usage of a.acc.Clear method is not compatible with our
// PerformOperation. Consider the following scenario:
// 1. we allocate []int64 Vec with 1024 capacity, this memory is registered
// with the allocator (8192 bytes).
// 2. then we Clear() the allocator, now the memory account says that we use
// 0 bytes, but the actual memory is not released.
// 3. we reuse the Vec and append into it 1024 int64 values. Because there
// was enough capacity to accommodate it, no new memory is allocated.
// 4. when we called PerformOperation to account for that memory, we computed
// "before" and "after" memory estimates, and in both cases they equal 8192
// bytes, so "delta" is 0, and the allocator continues to think that 0 bytes
// have been registered with it, but it is not the case.
// I think "adding disk queue to hash router" PR has more fine-grained
// methods for working with memory. Consider reusing them here.
a.acc.Clear(a.ctx)
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/colexec/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"reflect"

"github.com/cockroachdb/cockroach/pkg/col/coltypes"
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/typeconv"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
Expand Down Expand Up @@ -93,6 +94,7 @@ type NewColOperatorArgs struct {
Inputs []Operator
StreamingMemAccount *mon.BoundAccount
ProcessorConstructor execinfra.ProcessorConstructor
DiskQueueCfg colcontainer.DiskQueueCfg
TestingKnobs struct {
// UseStreamingMemAccountForBuffering specifies whether to use
// StreamingMemAccount when creating buffering operators and should only be
Expand All @@ -108,6 +110,9 @@ type NewColOperatorArgs struct {
// DiskSpillingDisabled specifies whether only in-memory operators should
// be created.
DiskSpillingDisabled bool
// MaxNumberPartitions determines the maximum number of "active"
// partitions for Partitioner interface.
MaxNumberPartitions int
}
}

Expand Down Expand Up @@ -770,7 +775,9 @@ func NewColOperator(
unlimitedAllocator,
input, inputTypes, core.Sorter.OutputOrdering,
execinfra.GetWorkMemLimit(flowCtx.Cfg),
args.TestingKnobs.MaxNumberPartitions,
diskQueuesUnlimitedAllocator,
args.DiskQueueCfg,
)
},
args.TestingKnobs.SpillingCallbackFn,
Expand Down
219 changes: 154 additions & 65 deletions pkg/sql/colexec/external_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/coltypes"
"github.com/cockroachdb/cockroach/pkg/sql/colcontainer"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/execerror"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
)
Expand Down Expand Up @@ -47,9 +48,20 @@ const (
// indicates that the end of the partition has been reached and we should
// transition to starting a new partition.
externalSorterSpillPartition
// externalSorterMerging indicates that we have fully consumed the input and
// should proceed to merging the partitions and emitting the batches.
externalSorterMerging
// externalSorterRepeatedMerging indicates that we need to merge several
// partitions into one and spill that new partition to disk. The number of
// partitions that we can merge at a time is determined by
// maxNumberPartitions. This procedure will be repeated until only one
// partition is left, and we transition to externalSorterNewPartition state.
externalSorterRepeatedMerging
// externalSorterFinalMerging indicates that we have fully consumed the input
// and can merge all of the partitions in one step. We then transition to
// externalSorterEmitting state.
externalSorterFinalMerging
// externalSorterEmitting indicates that we are ready to emit output. A zero-
// length batch in this state indicates that we have emitted all tuples and
// should transition to externalSorterFinished state.
externalSorterEmitting
// externalSorterFinished indicates that all tuples from all partitions have
// been emitted and from now on only a zero-length batch will be emitted by
// the external sorter. This state is also responsible for closing the
Expand All @@ -63,10 +75,8 @@ const (
// divide up all batches from the input into partitions, sort each partition in
// memory, and write sorted partitions to disk
// 2. it will use OrderedSynchronizer to merge the partitions.
// TODO(yuzefovich): we probably want to have a maximum number of partitions at
// a time. In that case, we might need several stages of mergers.
//
// The diagram of the components involved is as follows:
// The (simplified) diagram of the components involved is as follows:
//
// input
// |
Expand Down Expand Up @@ -95,19 +105,33 @@ const (
// whether a new partition must be started
// - external sorter resets in-memory sorter (which, in turn, resets input
// partitioner) once the full partition has been spilled to disk.
//
// What is hidden in the diagram is the fact that at some point we might need
// to merge several partitions into a new one that we spill to disk in order to
// reduce the number of "active" partitions. This requirement comes from the
// need to limit the number of "active" partitions because each partition uses
// some amount of RAM for its buffer. This is determined by
// maxNumberPartitions variable.
type externalSorter struct {
OneInputNode
NonExplainable

unlimitedAllocator *Allocator
state externalSorterState
inputTypes []coltypes.T
ordering execinfrapb.Ordering
inMemSorter resettableOperator
partitioner Partitioner
numPartitions int
merger Operator
singlePartitionOutput Operator
unlimitedAllocator *Allocator
memoryLimit int64
state externalSorterState
inputDone bool
inputTypes []coltypes.T
ordering execinfrapb.Ordering
inMemSorter resettableOperator
partitioner Partitioner
// numPartitions is the current number of partitions.
numPartitions int
// firstPartitionIdx is the index of the first partition to merge next.
firstPartitionIdx int
maxNumberPartitions int
cfg colcontainer.DiskQueueCfg

emitter Operator
}

var _ Operator = &externalSorter{}
Expand All @@ -120,14 +144,19 @@ var _ Operator = &externalSorter{}
// - diskQueuesUnlimitedAllocator is a (temporary) unlimited allocator that
// will be used to create dummy queues and will be removed once we have actual
// disk-backed queues.
// - maxNumberPartitions (when non-zero) overrides the semi-dynamically
// computed maximum number of partitions to have at once. It should be
// non-zero only in tests.
func newExternalSorter(
unlimitedAllocator *Allocator,
input Operator,
inputTypes []coltypes.T,
ordering execinfrapb.Ordering,
memoryLimit int64,
maxNumberPartitions int,
// TODO(yuzefovich): remove this once actual disk queues are in-place.
diskQueuesUnlimitedAllocator *Allocator,
cfg colcontainer.DiskQueueCfg,
) Operator {
inputPartitioner := newInputPartitioningOperator(unlimitedAllocator, input, memoryLimit)
inMemSorter, err := newSorter(
Expand All @@ -137,13 +166,32 @@ func newExternalSorter(
if err != nil {
execerror.VectorizedInternalPanic(err)
}
if cfg.BufferSizeBytes > 0 && maxNumberPartitions == 0 {
// Each disk queue will use up to BufferSizeBytes of RAM, so we will give
// it almost all of the available memory (except for a single output batch
// that mergers will use).
batchMemSize := estimateBatchSizeBytes(inputTypes, int(coldata.BatchSize()))
// TODO(yuzefovich): we currently allocate a full-sized batch in
// partitionerToOperator, but once we use actual disk-backed queues, we
// should allocate zero-sized batch in there and all memory will be
// allocated by the partitioner (and will be included in BufferSizeBytes).
maxNumberPartitions = (int(memoryLimit) - batchMemSize) / (cfg.BufferSizeBytes + batchMemSize)
}
// In order to make progress when merging we have to merge at least two
// partitions.
if maxNumberPartitions < 2 {
maxNumberPartitions = 2
}
return &externalSorter{
OneInputNode: NewOneInputNode(inMemSorter),
unlimitedAllocator: unlimitedAllocator,
inMemSorter: inMemSorter,
partitioner: newDummyPartitioner(diskQueuesUnlimitedAllocator, inputTypes),
inputTypes: inputTypes,
ordering: ordering,
OneInputNode: NewOneInputNode(inMemSorter),
unlimitedAllocator: unlimitedAllocator,
memoryLimit: memoryLimit,
inMemSorter: inMemSorter,
partitioner: newDummyPartitioner(diskQueuesUnlimitedAllocator, inputTypes),
inputTypes: inputTypes,
ordering: ordering,
maxNumberPartitions: maxNumberPartitions,
cfg: cfg,
}
}

Expand All @@ -160,72 +208,94 @@ func (s *externalSorter) Next(ctx context.Context) coldata.Batch {
if b.Length() == 0 {
// The input has been fully exhausted, so we transition to "merging
// partitions" state.
s.state = externalSorterMerging
s.inputDone = true
s.state = externalSorterRepeatedMerging
continue
}
if err := s.partitioner.Enqueue(s.numPartitions, b); err != nil {
newPartitionIdx := s.firstPartitionIdx + s.numPartitions
if err := s.partitioner.Enqueue(newPartitionIdx, b); err != nil {
execerror.VectorizedInternalPanic(err)
}
s.state = externalSorterSpillPartition
continue
case externalSorterSpillPartition:
curPartitionIdx := s.firstPartitionIdx + s.numPartitions
b := s.input.Next(ctx)
if b.Length() == 0 {
// The partition has been fully spilled, so we reset the in-memory
// sorter (which will reset inputPartitioningOperator) and transition
// to "new partition" state.
// sorter (which will reset inputPartitioningOperator).
s.inMemSorter.reset()
s.state = externalSorterNewPartition
s.numPartitions++
if s.numPartitions == s.maxNumberPartitions {
// We have reached the maximum number of active partitions, so we
// need to merge them and spill the new partition to disk before we
// can proceed on consuming the input.
s.state = externalSorterRepeatedMerging
continue
}
s.state = externalSorterNewPartition
continue
}
if err := s.partitioner.Enqueue(s.numPartitions, b); err != nil {
if err := s.partitioner.Enqueue(curPartitionIdx, b); err != nil {
execerror.VectorizedInternalPanic(err)
}
continue
case externalSorterMerging:
// Ideally, we should not be in such a state that we have zero or one
// partition (we should have not spilled in such scenario), but we want
// to be safe and handle those cases anyway.
case externalSorterRepeatedMerging:
if s.numPartitions < 2 {
if s.inputDone {
s.state = externalSorterFinalMerging
} else {
s.state = externalSorterNewPartition
}
continue
}
numPartitionsToMerge := s.maxNumberPartitions
if numPartitionsToMerge > s.numPartitions {
numPartitionsToMerge = s.numPartitions
}
if numPartitionsToMerge == s.numPartitions && s.inputDone {
// The input has been fully consumed and we can merge all of the
// remaining partitions, so we transition to "final merging" state.
s.state = externalSorterFinalMerging
continue
}
// We will merge all partitions in range [s.firstPartitionIdx,
// s.firstPartitionIdx+numPartitionsToMerge) and will spill all the
// resulting batches into a new partition with the next available
// index.
merger := s.createMergerForPartitions(s.firstPartitionIdx, numPartitionsToMerge)
merger.Init()
newPartitionIdx := s.firstPartitionIdx + s.numPartitions
for b := merger.Next(ctx); b.Length() > 0; b = merger.Next(ctx) {
if err := s.partitioner.Enqueue(newPartitionIdx, b); err != nil {
execerror.VectorizedInternalPanic(err)
}
}
s.firstPartitionIdx += numPartitionsToMerge
s.numPartitions -= numPartitionsToMerge - 1
s.unlimitedAllocator.Clear()
continue
case externalSorterFinalMerging:
if s.numPartitions == 0 {
s.state = externalSorterFinished
continue
} else if s.numPartitions == 1 {
if s.singlePartitionOutput == nil {
s.singlePartitionOutput = newPartitionerToOperator(
s.unlimitedAllocator, s.inputTypes, s.partitioner, 0, /* partitionIdx */
)
s.singlePartitionOutput.Init()
}
b := s.singlePartitionOutput.Next(ctx)
if b.Length() == 0 {
s.state = externalSorterFinished
continue
}
return b
s.emitter = newPartitionerToOperator(
s.unlimitedAllocator, s.inputTypes, s.partitioner, s.firstPartitionIdx,
)
} else {
if s.merger == nil {
syncInputs := make([]Operator, s.numPartitions)
for i := range syncInputs {
syncInputs[i] = newPartitionerToOperator(
s.unlimitedAllocator, s.inputTypes, s.partitioner, i,
)
}
s.merger = NewOrderedSynchronizer(
s.unlimitedAllocator,
syncInputs,
s.inputTypes,
execinfrapb.ConvertToColumnOrdering(s.ordering),
)
s.merger.Init()
}
b := s.merger.Next(ctx)
if b.Length() == 0 {
s.state = externalSorterFinished
continue
}
return b
s.emitter = s.createMergerForPartitions(s.firstPartitionIdx, s.numPartitions)
}
s.emitter.Init()
s.state = externalSorterEmitting
continue
case externalSorterEmitting:
b := s.emitter.Next(ctx)
if b.Length() == 0 {
s.state = externalSorterFinished
continue
}
return b
case externalSorterFinished:
if err := s.partitioner.Close(); err != nil {
execerror.VectorizedInternalPanic(err)
Expand All @@ -237,6 +307,23 @@ func (s *externalSorter) Next(ctx context.Context) coldata.Batch {
}
}

// createMergerForPartitions creates an ordered synchronizer that will merge
// partitions in [firstIdx, firstIdx+numPartitions) range.
func (s *externalSorter) createMergerForPartitions(firstIdx, numPartitions int) Operator {
syncInputs := make([]Operator, numPartitions)
for i := range syncInputs {
syncInputs[i] = newPartitionerToOperator(
s.unlimitedAllocator, s.inputTypes, s.partitioner, firstIdx+i,
)
}
return NewOrderedSynchronizer(
s.unlimitedAllocator,
syncInputs,
s.inputTypes,
execinfrapb.ConvertToColumnOrdering(s.ordering),
)
}

func newInputPartitioningOperator(
unlimitedAllocator *Allocator, input Operator, memoryLimit int64,
) resettableOperator {
Expand Down Expand Up @@ -282,7 +369,9 @@ func newPartitionerToOperator(
return &partitionerToOperator{
partitioner: partitioner,
partitionIdx: partitionIdx,
batch: allocator.NewMemBatch(types),
// TODO(yuzefovich): allocate zero-sized batch once the disk-backed
// partitioner is used.
batch: allocator.NewMemBatch(types),
}
}

Expand Down
Loading

0 comments on commit 3429c96

Please sign in to comment.