Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
87674: colexecdisk: improve the merge in external sort a bit r=yuzefovich a=yuzefovich

In order to perform the repeated merging of partitions in the external sort we use the ordered synchronizer. Previously, we would not specify the number of tuples that would be merged by the synchronizer which resulted in its output batch growing dynamically. However, we do know exactly that number, so we can give the hint to the synchronizer to attempt allocating the largest batch right away. This should speed up the external sort a bit (I did run our microbenchmarks, and they didn't notice any changes because we don't use "interesting" memory limits (either too large or too small for this change to make impact)).

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Sep 12, 2022
2 parents 773568f + edd8549 commit 6309178
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 11 deletions.
8 changes: 6 additions & 2 deletions pkg/sql/colexec/colexecdisk/external_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,10 +673,13 @@ func (s *externalSorter) createMergerForPartitions(n int) *colexec.OrderedSynchr
)
}

// Calculate the limit on the output batch mem size.
// Calculate the limit on the output batch mem size as well as the total
// number of tuples to merge.
outputBatchMemSize := s.mergeMemoryLimit
var tuplesToMerge int64
for i := s.numPartitions - n; i < s.numPartitions; i++ {
outputBatchMemSize -= s.partitionsInfo.maxBatchMemSize[i]
tuplesToMerge += int64(s.partitionsInfo.tupleCount[i])
s.resetPartitionsInfo(i)
}
// It is possible that the expected usage of the dequeued batches already
Expand All @@ -689,7 +692,8 @@ func (s *externalSorter) createMergerForPartitions(n int) *colexec.OrderedSynchr
outputBatchMemSize = minOutputBatchMemSize
}
return colexec.NewOrderedSynchronizer(
s.outputUnlimitedAllocator, outputBatchMemSize, syncInputs, s.inputTypes, s.columnOrdering,
s.outputUnlimitedAllocator, outputBatchMemSize, syncInputs,
s.inputTypes, s.columnOrdering, tuplesToMerge,
)
}

Expand Down
12 changes: 11 additions & 1 deletion pkg/sql/colexec/ordered_synchronizer.eg.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pkg/sql/colexec/ordered_synchronizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestOrderedSync(t *testing.T) {
typs[i] = types.Int
}
colexectestutils.RunTests(t, testAllocator, tc.sources, tc.expected, colexectestutils.OrderedVerifier, func(inputs []colexecop.Operator) (colexecop.Operator, error) {
return NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, colexectestutils.MakeInputs(inputs), typs, tc.ordering), nil
return NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, colexectestutils.MakeInputs(inputs), typs, tc.ordering, 0 /* tuplesToMerge */), nil
})
}
}
Expand Down Expand Up @@ -189,7 +189,7 @@ func TestOrderedSyncRandomInput(t *testing.T) {
inputs[i].Root = colexectestutils.NewOpTestInput(testAllocator, batchSize, sources[i], typs)
}
ordering := colinfo.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}}
op := NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, inputs, typs, ordering)
op := NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, inputs, typs, ordering, 0 /* tuplesToMerge */)
op.Init(context.Background())
out := colexectestutils.NewOpTestOutput(op, expected)
if err := out.Verify(); err != nil {
Expand Down Expand Up @@ -218,7 +218,7 @@ func BenchmarkOrderedSynchronizer(b *testing.B) {
}

ordering := colinfo.ColumnOrdering{{ColIdx: 0, Direction: encoding.Ascending}}
op := NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, inputs, typs, ordering)
op := NewOrderedSynchronizer(testAllocator, execinfra.DefaultMemoryLimit, inputs, typs, ordering, 0 /* tuplesToMerge */)
op.Init(ctx)

b.SetBytes(8 * int64(coldata.BatchSize()) * numInputs)
Expand Down
12 changes: 11 additions & 1 deletion pkg/sql/colexec/ordered_synchronizer_tmpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type OrderedSynchronizer struct {
ordering colinfo.ColumnOrdering
typs []*types.T
canonicalTypeFamilies []types.Family
// tuplesToMerge (when positive) tracks the number of tuples that are still
// to be merged by synchronizer.
tuplesToMerge int64

// inputBatches stores the current batch for each input.
inputBatches []coldata.Batch
Expand Down Expand Up @@ -101,18 +104,22 @@ func (o *OrderedSynchronizer) Child(nth int, verbose bool) execopnode.OpNode {

// NewOrderedSynchronizer creates a new OrderedSynchronizer.
// - memoryLimit will limit the size of batches produced by the synchronizer.
// - tuplesToMerge, if positive, indicates the total number of tuples that will
// be emitted by all inputs, use 0 if unknown.
func NewOrderedSynchronizer(
allocator *colmem.Allocator,
memoryLimit int64,
inputs []colexecargs.OpWithMetaInfo,
typs []*types.T,
ordering colinfo.ColumnOrdering,
tuplesToMerge int64,
) *OrderedSynchronizer {
os := &OrderedSynchronizer{
inputs: inputs,
ordering: ordering,
typs: typs,
canonicalTypeFamilies: typeconv.ToCanonicalTypeFamilies(typs),
tuplesToMerge: tuplesToMerge,
}
os.accountingHelper.Init(allocator, memoryLimit, typs)
return os
Expand Down Expand Up @@ -204,13 +211,16 @@ func (o *OrderedSynchronizer) Next() coldata.Batch {
}

o.output.SetLength(outputIdx)
// Note that it's ok if this number becomes negative - the accounting helper
// will ignore it.
o.tuplesToMerge -= int64(outputIdx)
return o.output
}

func (o *OrderedSynchronizer) resetOutput() {
var reallocated bool
o.output, reallocated = o.accountingHelper.ResetMaybeReallocate(
o.typs, o.output, 0, /* tuplesToBeSet */
o.typs, o.output, int(o.tuplesToMerge), /* tuplesToBeSet */
)
if reallocated {
o.outVecs.SetBatch(o.output)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ func (s *vectorizedFlowCreator) setupInput(
os := colexec.NewOrderedSynchronizer(
colmem.NewAllocator(ctx, s.monitorRegistry.NewStreamingMemAccount(flowCtx), factory),
execinfra.GetWorkMemLimit(flowCtx), inputStreamOps,
input.ColumnTypes, execinfrapb.ConvertToColumnOrdering(input.Ordering),
input.ColumnTypes, execinfrapb.ConvertToColumnOrdering(input.Ordering), 0, /* tuplesToMerge */
)
opWithMetaInfo = colexecargs.OpWithMetaInfo{
Root: os,
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/colmem/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ func (h *AccountingHelper) Init(allocator *Allocator, memoryLimit int64) {
// from that point).
//
// - tuplesToBeSet, if positive, indicates the total number of tuples that are
// yet to be set, use 0 if unknown.
// yet to be set. Zero and negative values are ignored.
//
// NOTE: if the reallocation occurs, then the memory under the old batch is
// released, so it is expected that the caller will lose the references to the
Expand Down Expand Up @@ -754,7 +754,7 @@ func (h *AccountingHelper) ResetMaybeReallocate(
}
}
}
// Protect from the misuse.
// Ignore the negative values.
if tuplesToBeSet < 0 {
tuplesToBeSet = 0
}
Expand Down Expand Up @@ -889,7 +889,7 @@ func (h *SetAccountingHelper) getBytesLikeTotalSize() int64 {
// AccountingHelper.ResetMaybeReallocate (and thus has the same contract) with
// an additional logic for memory tracking purposes.
// - tuplesToBeSet, if positive, indicates the total number of tuples that are
// yet to be set, use 0 if unknown.
// yet to be set. Zero and negative values are ignored.
func (h *SetAccountingHelper) ResetMaybeReallocate(
typs []*types.T, oldBatch coldata.Batch, tuplesToBeSet int,
) (newBatch coldata.Batch, reallocated bool) {
Expand Down

0 comments on commit 6309178

Please sign in to comment.