diff --git a/pkg/sql/colexec/colexecdisk/external_sort.go b/pkg/sql/colexec/colexecdisk/external_sort.go index 8014a8f518f1..6d899925a8fd 100644 --- a/pkg/sql/colexec/colexecdisk/external_sort.go +++ b/pkg/sql/colexec/colexecdisk/external_sort.go @@ -673,10 +673,13 @@ func (s *externalSorter) createMergerForPartitions(n int) *colexec.OrderedSynchr ) } - // Calculate the limit on the output batch mem size. + // Calculate the limit on the output batch mem size as well as the total + // number of tuples to merge. outputBatchMemSize := s.mergeMemoryLimit + var tuplesToMerge int64 for i := s.numPartitions - n; i < s.numPartitions; i++ { outputBatchMemSize -= s.partitionsInfo.maxBatchMemSize[i] + tuplesToMerge += int64(s.partitionsInfo.tupleCount[i]) s.resetPartitionsInfo(i) } // It is possible that the expected usage of the dequeued batches already @@ -689,7 +692,8 @@ func (s *externalSorter) createMergerForPartitions(n int) *colexec.OrderedSynchr outputBatchMemSize = minOutputBatchMemSize } return colexec.NewOrderedSynchronizer( - s.outputUnlimitedAllocator, outputBatchMemSize, syncInputs, s.inputTypes, s.columnOrdering, + s.outputUnlimitedAllocator, outputBatchMemSize, syncInputs, + s.inputTypes, s.columnOrdering, tuplesToMerge, ) } diff --git a/pkg/sql/colexec/ordered_synchronizer.eg.go b/pkg/sql/colexec/ordered_synchronizer.eg.go index f143a8a572d3..d71a8ea32639 100644 --- a/pkg/sql/colexec/ordered_synchronizer.eg.go +++ b/pkg/sql/colexec/ordered_synchronizer.eg.go @@ -43,6 +43,9 @@ type OrderedSynchronizer struct { ordering colinfo.ColumnOrdering typs []*types.T canonicalTypeFamilies []types.Family + // tuplesToMerge (when positive) tracks the number of tuples that are still + // to be merged by synchronizer. + tuplesToMerge int64 // inputBatches stores the current batch for each input. inputBatches []coldata.Batch @@ -80,18 +83,22 @@ func (o *OrderedSynchronizer) Child(nth int, verbose bool) execopnode.OpNode { // NewOrderedSynchronizer creates a new OrderedSynchronizer. // - memoryLimit will limit the size of batches produced by the synchronizer. +// - tuplesToMerge, if positive, indicates the total number of tuples that will +// be emitted by all inputs, use 0 if unknown. func NewOrderedSynchronizer( allocator *colmem.Allocator, memoryLimit int64, inputs []colexecargs.OpWithMetaInfo, typs []*types.T, ordering colinfo.ColumnOrdering, + tuplesToMerge int64, ) *OrderedSynchronizer { os := &OrderedSynchronizer{ inputs: inputs, ordering: ordering, typs: typs, canonicalTypeFamilies: typeconv.ToCanonicalTypeFamilies(typs), + tuplesToMerge: tuplesToMerge, } os.accountingHelper.Init(allocator, memoryLimit, typs) return os @@ -256,13 +263,16 @@ func (o *OrderedSynchronizer) Next() coldata.Batch { } o.output.SetLength(outputIdx) + // Note that it's ok if this number becomes negative - the accounting helper + // will ignore it. + o.tuplesToMerge -= int64(outputIdx) return o.output } func (o *OrderedSynchronizer) resetOutput() { var reallocated bool o.output, reallocated = o.accountingHelper.ResetMaybeReallocate( - o.typs, o.output, 0, /* tuplesToBeSet */ + o.typs, o.output, int(o.tuplesToMerge), /* tuplesToBeSet */ ) if reallocated { o.outVecs.SetBatch(o.output) diff --git a/pkg/sql/colexec/ordered_synchronizer_test.go b/pkg/sql/colexec/ordered_synchronizer_test.go index e1e1af66b022..140026e68e74 100644 --- a/pkg/sql/colexec/ordered_synchronizer_test.go +++ b/pkg/sql/colexec/ordered_synchronizer_test.go @@ -148,7 +148,7 @@ func TestOrderedSync(t *testing.T) { typs[i] = types.Int } colexectestutils.RunTests(t, testAllocator, tc.sources, tc.expected, colexectestutils.OrderedVerifier, func(inputs []colexecop.Operator) (colexecop.Operator, error) { - return NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, colexectestutils.MakeInputs(inputs), typs, tc.ordering), nil + return NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, colexectestutils.MakeInputs(inputs), typs, tc.ordering, 0 /* tuplesToMerge */), nil }) } } @@ -189,7 +189,7 @@ func TestOrderedSyncRandomInput(t *testing.T) { inputs[i].Root = colexectestutils.NewOpTestInput(testAllocator, batchSize, sources[i], typs) } ordering := colinfo.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}} - op := NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, inputs, typs, ordering) + op := NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, inputs, typs, ordering, 0 /* tuplesToMerge */) op.Init(context.Background()) out := colexectestutils.NewOpTestOutput(op, expected) if err := out.Verify(); err != nil { @@ -218,7 +218,7 @@ func BenchmarkOrderedSynchronizer(b *testing.B) { } ordering := colinfo.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}} - op := NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, inputs, typs, ordering) + op := NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, inputs, typs, ordering, 0 /* tuplesToMerge */) op.Init(ctx) b.SetBytes(8 * int64(coldata.BatchSize()) * numInputs) diff --git a/pkg/sql/colexec/ordered_synchronizer_tmpl.go b/pkg/sql/colexec/ordered_synchronizer_tmpl.go index 05e159a8d820..96c308334659 100644 --- a/pkg/sql/colexec/ordered_synchronizer_tmpl.go +++ b/pkg/sql/colexec/ordered_synchronizer_tmpl.go @@ -64,6 +64,9 @@ type OrderedSynchronizer struct { ordering colinfo.ColumnOrdering typs []*types.T canonicalTypeFamilies []types.Family + // tuplesToMerge (when positive) tracks the number of tuples that are still + // to be merged by synchronizer. + tuplesToMerge int64 // inputBatches stores the current batch for each input. inputBatches []coldata.Batch @@ -101,18 +104,22 @@ func (o *OrderedSynchronizer) Child(nth int, verbose bool) execopnode.OpNode { // NewOrderedSynchronizer creates a new OrderedSynchronizer. // - memoryLimit will limit the size of batches produced by the synchronizer. +// - tuplesToMerge, if positive, indicates the total number of tuples that will +// be emitted by all inputs, use 0 if unknown. func NewOrderedSynchronizer( allocator *colmem.Allocator, memoryLimit int64, inputs []colexecargs.OpWithMetaInfo, typs []*types.T, ordering colinfo.ColumnOrdering, + tuplesToMerge int64, ) *OrderedSynchronizer { os := &OrderedSynchronizer{ inputs: inputs, ordering: ordering, typs: typs, canonicalTypeFamilies: typeconv.ToCanonicalTypeFamilies(typs), + tuplesToMerge: tuplesToMerge, } os.accountingHelper.Init(allocator, memoryLimit, typs) return os @@ -204,13 +211,16 @@ func (o *OrderedSynchronizer) Next() coldata.Batch { } o.output.SetLength(outputIdx) + // Note that it's ok if this number becomes negative - the accounting helper + // will ignore it. + o.tuplesToMerge -= int64(outputIdx) return o.output } func (o *OrderedSynchronizer) resetOutput() { var reallocated bool o.output, reallocated = o.accountingHelper.ResetMaybeReallocate( - o.typs, o.output, 0, /* tuplesToBeSet */ + o.typs, o.output, int(o.tuplesToMerge), /* tuplesToBeSet */ ) if reallocated { o.outVecs.SetBatch(o.output) diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 295889f50a78..475b265910d3 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -950,7 +950,7 @@ func (s *vectorizedFlowCreator) setupInput( os := colexec.NewOrderedSynchronizer( colmem.NewAllocator(ctx, s.monitorRegistry.NewStreamingMemAccount(flowCtx), factory), execinfra.GetWorkMemLimit(flowCtx), inputStreamOps, - input.ColumnTypes, execinfrapb.ConvertToColumnOrdering(input.Ordering), + input.ColumnTypes, execinfrapb.ConvertToColumnOrdering(input.Ordering), 0, /* tuplesToMerge */ ) opWithMetaInfo = colexecargs.OpWithMetaInfo{ Root: os, diff --git a/pkg/sql/colmem/allocator.go b/pkg/sql/colmem/allocator.go index a946307bb454..1cf217244aef 100644 --- a/pkg/sql/colmem/allocator.go +++ b/pkg/sql/colmem/allocator.go @@ -726,7 +726,7 @@ func (h *AccountingHelper) Init(allocator *Allocator, memoryLimit int64) { // from that point). // // - tuplesToBeSet, if positive, indicates the total number of tuples that are -// yet to be set, use 0 if unknown. +// yet to be set. Zero and negative values are ignored. // // NOTE: if the reallocation occurs, then the memory under the old batch is // released, so it is expected that the caller will lose the references to the @@ -754,7 +754,7 @@ func (h *AccountingHelper) ResetMaybeReallocate( } } } - // Protect from the misuse. + // Ignore the negative values. if tuplesToBeSet < 0 { tuplesToBeSet = 0 } @@ -889,7 +889,7 @@ func (h *SetAccountingHelper) getBytesLikeTotalSize() int64 { // AccountingHelper.ResetMaybeReallocate (and thus has the same contract) with // an additional logic for memory tracking purposes. // - tuplesToBeSet, if positive, indicates the total number of tuples that are -// yet to be set, use 0 if unknown. +// yet to be set. Zero and negative values are ignored. func (h *SetAccountingHelper) ResetMaybeReallocate( typs []*types.T, oldBatch coldata.Batch, tuplesToBeSet int, ) (newBatch coldata.Batch, reallocated bool) {