Skip to content

Commit

Permalink
colexec: adds support for partial ordering in topk sorter
Browse files Browse the repository at this point in the history
Previously, topKSorter had to process all input rows before returning
the top K rows according to its specified ordering. If a subset of the
input rows were already ordered, topKSorter would still iterate over the
entire input.

However, if the input was partially ordered, topKSorter could
potentially stop iterating early, since after it has found K candidates
it is guaranteed not to find any better top candidates.

For example, take the following query and table with an index on a:

```
  a | b
----+----
  1 | 5
  2 | 3
  2 | 1
  3 | 3
  5 | 3

SELECT * FROM t ORDER BY a, b LIMIT 2
```

Given an index scan on a to provide a's ordering, topk only needs to
process 3 rows in order to guarantee that it has found the top K rows.
Once it finishes processing the third row [2, 1], all subsequent rows
have higher values of a than the top 2 rows found so far, and
therefore cannot be in the top 2 rows.

This change modifies the vectorized engine's TopKSorter signature to include
a partial ordering. The TopKSorter chunks the input according to the
sorted columns and processes each chunk with its existing heap
algorithm. Row comparison in the heap is also optimized so that tuples
in the same chunk only compare non-sorted columns. At the end
of each chunk, if K rows are in the heap, TopKSorter emits the rows and
stops execution.

A later commit, once merged with top K optimizer and distsql changes,
will adjust the cost model for top K to reflect this change.

Informs cockroachdb#69724

Release note: None
  • Loading branch information
rharding6373 committed Sep 21, 2021
1 parent 339f1c2 commit afc78d5
Show file tree
Hide file tree
Showing 19 changed files with 962 additions and 149 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,7 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/rowstovec.eg.go \
pkg/sql/colexec/select_in.eg.go \
pkg/sql/colexec/sort.eg.go \
pkg/sql/colexec/sorttopk.eg.go \
pkg/sql/colexec/sort_partitioner.eg.go \
pkg/sql/colexec/substring.eg.go \
pkg/sql/colexec/values_differ.eg.go \
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ targets = [
("rowstovec.eg.go", "rowstovec_tmpl.go"),
("select_in.eg.go", "select_in_tmpl.go"),
("sort.eg.go", "sort_tmpl.go"),
("sorttopk.eg.go", "sorttopk_tmpl.go"),
("substring.eg.go", "substring_tmpl.go"),
("values_differ.eg.go", "values_differ_tmpl.go"),
("vec_comparators.eg.go", "vec_comparators_tmpl.go"),
Expand Down
35 changes: 18 additions & 17 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,23 @@ func (r opResult) createDiskBackedSort(
totalMemLimit := execinfra.GetWorkMemLimit(flowCtx)
spoolMemLimit := totalMemLimit * 4 / 5
maxOutputBatchMemSize := totalMemLimit - spoolMemLimit
if matchLen > 0 {
if limit != 0 {
// There is a limit specified, so we know exactly how many rows the
// sorter should output. Use a top K sorter, which uses a heap to avoid
// storing more rows than necessary.
var topKSorterMemAccount *mon.BoundAccount
if useStreamingMemAccountForBuffering {
topKSorterMemAccount = streamingMemAccount
} else {
topKSorterMemAccount, sorterMemMonitorName = r.createMemAccountForSpillStrategyWithLimit(
ctx, flowCtx, spoolMemLimit, opNamePrefix+"topk-sort", processorID,
)
}
inMemorySorter, err = colexec.NewTopKSorter(
colmem.NewAllocator(ctx, topKSorterMemAccount, factory), input,
inputTypes, ordering.Columns, int(matchLen), uint64(limit), maxOutputBatchMemSize,
)
} else if matchLen > 0 {
// The input is already partially ordered. Use a chunks sorter to avoid
// loading all the rows into memory.
var sortChunksMemAccount *mon.BoundAccount
Expand All @@ -384,22 +400,6 @@ func (r opResult) createDiskBackedSort(
colmem.NewAllocator(ctx, sortChunksMemAccount, factory), input, inputTypes,
ordering.Columns, int(matchLen), maxOutputBatchMemSize,
)
} else if limit != 0 {
// There is a limit specified, so we know exactly how many rows the
// sorter should output. Use a top K sorter, which uses a heap to avoid
// storing more rows than necessary.
var topKSorterMemAccount *mon.BoundAccount
if useStreamingMemAccountForBuffering {
topKSorterMemAccount = streamingMemAccount
} else {
topKSorterMemAccount, sorterMemMonitorName = r.createMemAccountForSpillStrategyWithLimit(
ctx, flowCtx, spoolMemLimit, opNamePrefix+"topk-sort", processorID,
)
}
inMemorySorter = colexec.NewTopKSorter(
colmem.NewAllocator(ctx, topKSorterMemAccount, factory), input,
inputTypes, ordering.Columns, uint64(limit), maxOutputBatchMemSize,
)
} else {
// No optimizations possible. Default to the standard sort operator.
var sorterMemAccount *mon.BoundAccount
Expand Down Expand Up @@ -456,6 +456,7 @@ func (r opResult) createDiskBackedSort(
mergeUnlimitedAllocator,
outputUnlimitedAllocator,
input, inputTypes, ordering, uint64(limit),
int(matchLen),
execinfra.GetWorkMemLimit(flowCtx),
maxNumberPartitions,
args.TestingKnobs.NumForcedRepartitions,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/colexec/execgen/cmd/execgen/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ go_library(
"select_in_gen.go",
"selection_ops_gen.go",
"sort_gen.go",
"sorttopk_gen.go",
"span_assembler_gen.go",
"span_encoder_gen.go",
"substring_gen.go",
Expand Down
27 changes: 27 additions & 0 deletions pkg/sql/colexec/execgen/cmd/execgen/sorttopk_gen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package main

import (
"fmt"
"io"
)

const sortTopKTmpl = "pkg/sql/colexec/sorttopk_tmpl.go"

func genSortTopK(inputFileContents string, wr io.Writer) error {
_, err := fmt.Fprint(wr, inputFileContents)
return err
}

func init() {
registerGenerator(genSortTopK, "sorttopk.eg.go", sortTopKTmpl)
}
11 changes: 6 additions & 5 deletions pkg/sql/colexec/external_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func NewExternalSorter(
inputTypes []*types.T,
ordering execinfrapb.Ordering,
topK uint64,
matchLen int,
memoryLimit int64,
maxNumberPartitions int,
numForcedMerges int,
Expand Down Expand Up @@ -271,17 +272,17 @@ func NewExternalSorter(
}
inputPartitioner := newInputPartitioningOperator(sortUnlimitedAllocator, input, inputTypes, inMemSortPartitionLimit)
var inMemSorter colexecop.ResettableOperator
var err error
if topK > 0 {
inMemSorter = NewTopKSorter(sortUnlimitedAllocator, inputPartitioner, inputTypes, ordering.Columns, topK, inMemSortOutputLimit)
inMemSorter, err = NewTopKSorter(sortUnlimitedAllocator, inputPartitioner, inputTypes, ordering.Columns, matchLen, topK, inMemSortOutputLimit)
} else {
var err error
inMemSorter, err = newSorter(
sortUnlimitedAllocator, newAllSpooler(sortUnlimitedAllocator, inputPartitioner, inputTypes),
inputTypes, ordering.Columns, inMemSortOutputLimit,
)
if err != nil {
colexecerror.InternalError(err)
}
}
if err != nil {
colexecerror.InternalError(err)
}
partitionedDiskQueueSemaphore := fdSemaphore
if !delegateFDAcquisitions {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/colexec/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestSortRandomized(t *testing.T) {
}
colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tups}, expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) {
if topK {
return NewTopKSorter(testAllocator, input[0], typs[:nCols], ordCols, uint64(k), execinfra.DefaultMemoryLimit), nil
return NewTopKSorter(testAllocator, input[0], typs[:nCols], ordCols, matchLen, uint64(k), execinfra.DefaultMemoryLimit)
}
return NewSorter(testAllocator, input[0], typs[:nCols], ordCols, execinfra.DefaultMemoryLimit)
})
Expand Down Expand Up @@ -323,7 +323,7 @@ func BenchmarkSort(b *testing.B) {
var sorter colexecop.Operator
var err error
if topK {
sorter, err = NewTopKSorter(testAllocator, source, typs, ordCols, k, execinfra.DefaultMemoryLimit), nil
sorter, err = NewTopKSorter(testAllocator, source, typs, ordCols, 0 /* matchLen */, k, execinfra.DefaultMemoryLimit)
} else {
sorter, err = NewSorter(testAllocator, source, typs, ordCols, execinfra.DefaultMemoryLimit)
}
Expand Down
Loading

0 comments on commit afc78d5

Please sign in to comment.