From edd8549acdad7b0cc1d96b1b8c229e0958f5c984 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 8 Sep 2022 21:55:41 -0700 Subject: [PATCH] colexecdisk: improve the merge in external sort a bit In order to perform the repeated merging of partitions in the external sort we use the ordered synchronizer. Previously, we would not specify the number of tuples that would be merged by the synchronizer which resulted in its output batch growing dynamically. However, we do know exactly that number, so we can give the hint to the synchronizer to attempt allocating the largest batch right away. This should speed up the external sort a bit (I did run our microbenchmarks, and they didn't notice any changes because we don't use "interesting" memory limits (either too large or too small for this change to make impact)). Release note: None --- pkg/sql/colexec/colexecdisk/external_sort.go | 8 ++++++-- pkg/sql/colexec/ordered_synchronizer.eg.go | 12 +++++++++++- pkg/sql/colexec/ordered_synchronizer_test.go | 6 +++--- pkg/sql/colexec/ordered_synchronizer_tmpl.go | 12 +++++++++++- pkg/sql/colflow/vectorized_flow.go | 2 +- pkg/sql/colmem/allocator.go | 6 +++--- 6 files changed, 35 insertions(+), 11 deletions(-) diff --git a/pkg/sql/colexec/colexecdisk/external_sort.go b/pkg/sql/colexec/colexecdisk/external_sort.go index 8014a8f518f1..6d899925a8fd 100644 --- a/pkg/sql/colexec/colexecdisk/external_sort.go +++ b/pkg/sql/colexec/colexecdisk/external_sort.go @@ -673,10 +673,13 @@ func (s *externalSorter) createMergerForPartitions(n int) *colexec.OrderedSynchr ) } - // Calculate the limit on the output batch mem size. + // Calculate the limit on the output batch mem size as well as the total + // number of tuples to merge. outputBatchMemSize := s.mergeMemoryLimit + var tuplesToMerge int64 for i := s.numPartitions - n; i < s.numPartitions; i++ { outputBatchMemSize -= s.partitionsInfo.maxBatchMemSize[i] + tuplesToMerge += int64(s.partitionsInfo.tupleCount[i]) s.resetPartitionsInfo(i) } // It is possible that the expected usage of the dequeued batches already @@ -689,7 +692,8 @@ func (s *externalSorter) createMergerForPartitions(n int) *colexec.OrderedSynchr outputBatchMemSize = minOutputBatchMemSize } return colexec.NewOrderedSynchronizer( - s.outputUnlimitedAllocator, outputBatchMemSize, syncInputs, s.inputTypes, s.columnOrdering, + s.outputUnlimitedAllocator, outputBatchMemSize, syncInputs, + s.inputTypes, s.columnOrdering, tuplesToMerge, ) } diff --git a/pkg/sql/colexec/ordered_synchronizer.eg.go b/pkg/sql/colexec/ordered_synchronizer.eg.go index f143a8a572d3..d71a8ea32639 100644 --- a/pkg/sql/colexec/ordered_synchronizer.eg.go +++ b/pkg/sql/colexec/ordered_synchronizer.eg.go @@ -43,6 +43,9 @@ type OrderedSynchronizer struct { ordering colinfo.ColumnOrdering typs []*types.T canonicalTypeFamilies []types.Family + // tuplesToMerge (when positive) tracks the number of tuples that are still + // to be merged by synchronizer. + tuplesToMerge int64 // inputBatches stores the current batch for each input. inputBatches []coldata.Batch @@ -80,18 +83,22 @@ func (o *OrderedSynchronizer) Child(nth int, verbose bool) execopnode.OpNode { // NewOrderedSynchronizer creates a new OrderedSynchronizer. // - memoryLimit will limit the size of batches produced by the synchronizer. +// - tuplesToMerge, if positive, indicates the total number of tuples that will +// be emitted by all inputs, use 0 if unknown. func NewOrderedSynchronizer( allocator *colmem.Allocator, memoryLimit int64, inputs []colexecargs.OpWithMetaInfo, typs []*types.T, ordering colinfo.ColumnOrdering, + tuplesToMerge int64, ) *OrderedSynchronizer { os := &OrderedSynchronizer{ inputs: inputs, ordering: ordering, typs: typs, canonicalTypeFamilies: typeconv.ToCanonicalTypeFamilies(typs), + tuplesToMerge: tuplesToMerge, } os.accountingHelper.Init(allocator, memoryLimit, typs) return os @@ -256,13 +263,16 @@ func (o *OrderedSynchronizer) Next() coldata.Batch { } o.output.SetLength(outputIdx) + // Note that it's ok if this number becomes negative - the accounting helper + // will ignore it. + o.tuplesToMerge -= int64(outputIdx) return o.output } func (o *OrderedSynchronizer) resetOutput() { var reallocated bool o.output, reallocated = o.accountingHelper.ResetMaybeReallocate( - o.typs, o.output, 0, /* tuplesToBeSet */ + o.typs, o.output, int(o.tuplesToMerge), /* tuplesToBeSet */ ) if reallocated { o.outVecs.SetBatch(o.output) diff --git a/pkg/sql/colexec/ordered_synchronizer_test.go b/pkg/sql/colexec/ordered_synchronizer_test.go index e1e1af66b022..140026e68e74 100644 --- a/pkg/sql/colexec/ordered_synchronizer_test.go +++ b/pkg/sql/colexec/ordered_synchronizer_test.go @@ -148,7 +148,7 @@ func TestOrderedSync(t *testing.T) { typs[i] = types.Int } colexectestutils.RunTests(t, testAllocator, tc.sources, tc.expected, colexectestutils.OrderedVerifier, func(inputs []colexecop.Operator) (colexecop.Operator, error) { - return NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, colexectestutils.MakeInputs(inputs), typs, tc.ordering), nil + return NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, colexectestutils.MakeInputs(inputs), typs, tc.ordering, 0 /* tuplesToMerge */), nil }) } } @@ -189,7 +189,7 @@ func TestOrderedSyncRandomInput(t *testing.T) { inputs[i].Root = colexectestutils.NewOpTestInput(testAllocator, batchSize, sources[i], typs) } ordering := colinfo.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}} - op := NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, inputs, typs, ordering) + op := NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, inputs, typs, ordering, 0 /* tuplesToMerge */) op.Init(context.Background()) out := colexectestutils.NewOpTestOutput(op, expected) if err := out.Verify(); err != nil { @@ -218,7 +218,7 @@ func BenchmarkOrderedSynchronizer(b *testing.B) { } ordering := colinfo.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}} - op := NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, inputs, typs, ordering) + op := NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, inputs, typs, ordering, 0 /* tuplesToMerge */) op.Init(ctx) b.SetBytes(8 * int64(coldata.BatchSize()) * numInputs) diff --git a/pkg/sql/colexec/ordered_synchronizer_tmpl.go b/pkg/sql/colexec/ordered_synchronizer_tmpl.go index 05e159a8d820..96c308334659 100644 --- a/pkg/sql/colexec/ordered_synchronizer_tmpl.go +++ b/pkg/sql/colexec/ordered_synchronizer_tmpl.go @@ -64,6 +64,9 @@ type OrderedSynchronizer struct { ordering colinfo.ColumnOrdering typs []*types.T canonicalTypeFamilies []types.Family + // tuplesToMerge (when positive) tracks the number of tuples that are still + // to be merged by synchronizer. + tuplesToMerge int64 // inputBatches stores the current batch for each input. inputBatches []coldata.Batch @@ -101,18 +104,22 @@ func (o *OrderedSynchronizer) Child(nth int, verbose bool) execopnode.OpNode { // NewOrderedSynchronizer creates a new OrderedSynchronizer. // - memoryLimit will limit the size of batches produced by the synchronizer. +// - tuplesToMerge, if positive, indicates the total number of tuples that will +// be emitted by all inputs, use 0 if unknown. func NewOrderedSynchronizer( allocator *colmem.Allocator, memoryLimit int64, inputs []colexecargs.OpWithMetaInfo, typs []*types.T, ordering colinfo.ColumnOrdering, + tuplesToMerge int64, ) *OrderedSynchronizer { os := &OrderedSynchronizer{ inputs: inputs, ordering: ordering, typs: typs, canonicalTypeFamilies: typeconv.ToCanonicalTypeFamilies(typs), + tuplesToMerge: tuplesToMerge, } os.accountingHelper.Init(allocator, memoryLimit, typs) return os @@ -204,13 +211,16 @@ func (o *OrderedSynchronizer) Next() coldata.Batch { } o.output.SetLength(outputIdx) + // Note that it's ok if this number becomes negative - the accounting helper + // will ignore it. + o.tuplesToMerge -= int64(outputIdx) return o.output } func (o *OrderedSynchronizer) resetOutput() { var reallocated bool o.output, reallocated = o.accountingHelper.ResetMaybeReallocate( - o.typs, o.output, 0, /* tuplesToBeSet */ + o.typs, o.output, int(o.tuplesToMerge), /* tuplesToBeSet */ ) if reallocated { o.outVecs.SetBatch(o.output) diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 295889f50a78..475b265910d3 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -950,7 +950,7 @@ func (s *vectorizedFlowCreator) setupInput( os := colexec.NewOrderedSynchronizer( colmem.NewAllocator(ctx, s.monitorRegistry.NewStreamingMemAccount(flowCtx), factory), execinfra.GetWorkMemLimit(flowCtx), inputStreamOps, - input.ColumnTypes, execinfrapb.ConvertToColumnOrdering(input.Ordering), + input.ColumnTypes, execinfrapb.ConvertToColumnOrdering(input.Ordering), 0, /* tuplesToMerge */ ) opWithMetaInfo = colexecargs.OpWithMetaInfo{ Root: os, diff --git a/pkg/sql/colmem/allocator.go b/pkg/sql/colmem/allocator.go index a946307bb454..1cf217244aef 100644 --- a/pkg/sql/colmem/allocator.go +++ b/pkg/sql/colmem/allocator.go @@ -726,7 +726,7 @@ func (h *AccountingHelper) Init(allocator *Allocator, memoryLimit int64) { // from that point). // // - tuplesToBeSet, if positive, indicates the total number of tuples that are -// yet to be set, use 0 if unknown. +// yet to be set. Zero and negative values are ignored. // // NOTE: if the reallocation occurs, then the memory under the old batch is // released, so it is expected that the caller will lose the references to the @@ -754,7 +754,7 @@ func (h *AccountingHelper) ResetMaybeReallocate( } } } - // Protect from the misuse. + // Ignore the negative values. if tuplesToBeSet < 0 { tuplesToBeSet = 0 } @@ -889,7 +889,7 @@ func (h *SetAccountingHelper) getBytesLikeTotalSize() int64 { // AccountingHelper.ResetMaybeReallocate (and thus has the same contract) with // an additional logic for memory tracking purposes. // - tuplesToBeSet, if positive, indicates the total number of tuples that are -// yet to be set, use 0 if unknown. +// yet to be set. Zero and negative values are ignored. func (h *SetAccountingHelper) ResetMaybeReallocate( typs []*types.T, oldBatch coldata.Batch, tuplesToBeSet int, ) (newBatch coldata.Batch, reallocated bool) {