Skip to content

Commit

Permalink
colexec: adds support for partial ordering in topk sorter
Browse files Browse the repository at this point in the history
Previously, topKSorter had to process all input rows before returning
the top K rows according to its specified ordering. If a subset of the
input rows were already ordered, topKSorter would still iterate over the
entire input.

However, if the input was partially ordered, topKSorter could
potentially stop iterating early, since after it has found K candidates
it is guaranteed not to find any better top candidates.

For example, take the following query and table with an index on a:

```
  a | b
----+----
  1 | 5
  2 | 3
  2 | 1
  3 | 3
  5 | 3

SELECT * FROM t ORDER BY a, b LIMIT 2
```

Given an index scan on a to provide `a`'s ordering, topk only needs to
process 3 rows in order to guarantee that it has found the top K rows.
Once it finishes processing the third row `[2, 1]`, all subsequent rows
have higher values of `a` than the top 2 rows found so far, and
therefore cannot be in the top 2 rows.

This change modifies the vectorized engine's TopKSorter signature to include
a partial ordering. The TopKSorter chunks the input according to the
sorted columns and processes each chunk with its existing heap
algorithm. At the end of each chunk, if K rows are in the heap,
TopKSorter emits the rows and stops execution.

This change also includes a new microbenchmark, BenchmarkSortTopK, that
evaluates TopKSorter with a varying number of partially ordered columns
and varying chunk sizes.

Release justification:
Release note (<category, see below>): <what> <show> <why>
  • Loading branch information
rharding6373 committed Sep 8, 2021
1 parent 7c36a9d commit a58f00e
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 51 deletions.
36 changes: 18 additions & 18 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,22 +371,7 @@ func (r opResult) createDiskBackedSort(
totalMemLimit := execinfra.GetWorkMemLimit(flowCtx)
spoolMemLimit := totalMemLimit * 4 / 5
maxOutputBatchMemSize := totalMemLimit - spoolMemLimit
if matchLen > 0 {
// The input is already partially ordered. Use a chunks sorter to avoid
// loading all the rows into memory.
var sortChunksMemAccount *mon.BoundAccount
if useStreamingMemAccountForBuffering {
sortChunksMemAccount = streamingMemAccount
} else {
sortChunksMemAccount, sorterMemMonitorName = r.createMemAccountForSpillStrategyWithLimit(
ctx, flowCtx, spoolMemLimit, opNamePrefix+"sort-chunks", processorID,
)
}
inMemorySorter, err = colexec.NewSortChunks(
colmem.NewAllocator(ctx, sortChunksMemAccount, factory), input, inputTypes,
ordering.Columns, int(matchLen), maxOutputBatchMemSize,
)
} else if post.Limit != 0 && post.Limit < math.MaxUint64-post.Offset {
if post.Limit != 0 && post.Limit < math.MaxUint64-post.Offset {
// There is a limit specified, so we know exactly how many rows the
// sorter should output. The last part of the condition is making sure
// there is no overflow.
Expand All @@ -405,9 +390,24 @@ func (r opResult) createDiskBackedSort(
)
}
topK = post.Limit + post.Offset
inMemorySorter = colexec.NewTopKSorter(
inMemorySorter, err = colexec.NewTopKSorter(
colmem.NewAllocator(ctx, topKSorterMemAccount, factory), input,
inputTypes, ordering.Columns, topK, maxOutputBatchMemSize,
inputTypes, ordering.Columns, int(matchLen), topK, maxOutputBatchMemSize,
)
} else if matchLen > 0 {
// The input is already partially ordered. Use a chunks sorter to avoid
// loading all the rows into memory.
var sortChunksMemAccount *mon.BoundAccount
if useStreamingMemAccountForBuffering {
sortChunksMemAccount = streamingMemAccount
} else {
sortChunksMemAccount, sorterMemMonitorName = r.createMemAccountForSpillStrategyWithLimit(
ctx, flowCtx, spoolMemLimit, opNamePrefix+"sort-chunks", processorID,
)
}
inMemorySorter, err = colexec.NewSortChunks(
colmem.NewAllocator(ctx, sortChunksMemAccount, factory), input, inputTypes,
ordering.Columns, int(matchLen), maxOutputBatchMemSize,
)
} else {
// No optimizations possible. Default to the standard sort operator.
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/colexec/external_sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,17 +268,17 @@ func NewExternalSorter(
}
inputPartitioner := newInputPartitioningOperator(sortUnlimitedAllocator, input, inputTypes, inMemSortPartitionLimit)
var inMemSorter colexecop.ResettableOperator
var err error
if topK > 0 {
inMemSorter = NewTopKSorter(sortUnlimitedAllocator, inputPartitioner, inputTypes, ordering.Columns, topK, inMemSortOutputLimit)
inMemSorter, err = NewTopKSorter(sortUnlimitedAllocator, inputPartitioner, inputTypes, ordering.Columns, 0 /* matchLen */, topK, inMemSortOutputLimit)
} else {
var err error
inMemSorter, err = newSorter(
sortUnlimitedAllocator, newAllSpooler(sortUnlimitedAllocator, inputPartitioner, inputTypes),
inputTypes, ordering.Columns, inMemSortOutputLimit,
)
if err != nil {
colexecerror.InternalError(err)
}
}
if err != nil {
colexecerror.InternalError(err)
}
partitionedDiskQueueSemaphore := fdSemaphore
if !delegateFDAcquisitions {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colexec/external_sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func TestExternalSortRandomized(t *testing.T) {
// TODO(asubiotto): Not implemented yet, currently we rely on the
// flow tracking open FDs and releasing any leftovers.
var semsToCheck []semaphore.Semaphore
tups, expected, ordCols := generateRandomDataForTestSort(rng, nTups, nCols, nOrderingCols)
tups, expected, ordCols := generateRandomDataForTestSort(rng, nTups, nCols, nOrderingCols, 0 /* nPartialOrderingCols */)
colexectestutils.RunTests(
t,
testAllocator,
Expand Down
46 changes: 27 additions & 19 deletions pkg/sql/colexec/sort_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,19 +165,23 @@ func TestSortRandomized(t *testing.T) {
for nCols := 1; nCols < maxCols; nCols++ {
for nOrderingCols := 1; nOrderingCols <= nCols; nOrderingCols++ {
for _, k := range []int{0, rng.Intn(nTups) + 1} {
topK := k != 0
name := fmt.Sprintf("nCols=%d/nOrderingCols=%d/topK=%t", nCols, nOrderingCols, topK)
log.Infof(context.Background(), "%s", name)
tups, expected, ordCols := generateRandomDataForTestSort(rng, nTups, nCols, nOrderingCols)
if topK {
expected = expected[:k]
}
colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tups}, expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) {
if topK {
return NewTopKSorter(testAllocator, input[0], typs[:nCols], ordCols, uint64(k), execinfra.DefaultMemoryLimit), nil
}
return NewSorter(testAllocator, input[0], typs[:nCols], ordCols, execinfra.DefaultMemoryLimit)
})
topK := k != 0
nPartialOrderingCols := 0
if topK {
nPartialOrderingCols = rng.Intn(nOrderingCols)
}
name := fmt.Sprintf("nCols=%d/nOrderingCols=%d/nPartialOrderingCols=%d/topK=%t", nCols, nOrderingCols, nPartialOrderingCols, topK)
log.Infof(context.Background(), "%s", name)
tups, expected, ordCols := generateRandomDataForTestSort(rng, nTups, nCols, nOrderingCols, nPartialOrderingCols)
if topK {
expected = expected[:k]
}
colexectestutils.RunTests(t, testAllocator, []colexectestutils.Tuples{tups}, expected, colexectestutils.OrderedVerifier, func(input []colexecop.Operator) (colexecop.Operator, error) {
if topK {
return NewTopKSorter(testAllocator, input[0], typs[:nCols], ordCols, nPartialOrderingCols /* matchLen */, uint64(k), execinfra.DefaultMemoryLimit)
}
return NewSorter(testAllocator, input[0], typs[:nCols], ordCols, execinfra.DefaultMemoryLimit)
})
}
}
}
Expand All @@ -189,9 +193,11 @@ func TestSortRandomized(t *testing.T) {
// - expected - the same data but already sorted
// - ordCols - ordering columns used in the sort operation.
func generateRandomDataForTestSort(
rng *rand.Rand, nTups, nCols, nOrderingCols int,
rng *rand.Rand, nTups, nCols, nOrderingCols, nPartialOrderingCols int,
) (tups, expected colexectestutils.Tuples, ordCols []execinfrapb.Ordering_Column) {
ordCols = generateColumnOrdering(rng, nCols, nOrderingCols)
partialOrdCols := make([]execinfrapb.Ordering_Column, nPartialOrderingCols)
copy(partialOrdCols, ordCols[0:nPartialOrderingCols])
tups = make(colexectestutils.Tuples, nTups)
for i := range tups {
tups[i] = make(colexectestutils.Tuple, nCols)
Expand All @@ -208,6 +214,7 @@ func generateRandomDataForTestSort(
tups[i][ordCols[nOrderingCols-1].ColIdx] = int64(i)
}

sort.Slice(tups, less(tups, partialOrdCols))
expected = make(colexectestutils.Tuples, nTups)
copy(expected, tups)
sort.Slice(expected, less(expected, ordCols))
Expand Down Expand Up @@ -315,14 +322,15 @@ func BenchmarkSort(b *testing.B) {
for n := 0; n < b.N; n++ {
source := colexectestutils.NewFiniteBatchSource(testAllocator, batch, typs, nBatches)
var sorter colexecop.Operator
var err error
if topK {
sorter = NewTopKSorter(testAllocator, source, typs, ordCols, k, execinfra.DefaultMemoryLimit)
// TODO(harding): Randomize partial ordering columns, too.
sorter, err = NewTopKSorter(testAllocator, source, typs, ordCols, 0 /* matchLen */, k, execinfra.DefaultMemoryLimit)
} else {
var err error
sorter, err = NewSorter(testAllocator, source, typs, ordCols, execinfra.DefaultMemoryLimit)
if err != nil {
b.Fatal(err)
}
}
if err != nil {
b.Fatal(err)
}
sorter.Init(ctx)
for out := sorter.Next(); out.Length() != 0; out = sorter.Next() {
Expand Down
54 changes: 49 additions & 5 deletions pkg/sql/colexec/sorttopk.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package colexec
import (
"container/heap"
"context"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecbase"

"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecutils"
Expand All @@ -31,23 +32,38 @@ const (

// NewTopKSorter returns a new sort operator, which sorts its input on the
// columns given in orderingCols and returns the first K rows. The inputTypes
// must correspond 1-1 with the columns in the input operator.
// must correspond 1-1 with the columns in the input operator. If matchLen is
// non-zero, then the input tuples must be sorted on first matchLen columns.
func NewTopKSorter(
allocator *colmem.Allocator,
input colexecop.Operator,
inputTypes []*types.T,
orderingCols []execinfrapb.Ordering_Column,
matchLen int,
k uint64,
maxOutputBatchMemSize int64,
) colexecop.ResettableOperator {
return &topKSorter{
) (colexecop.ResettableOperator, error) {
base := &topKSorter{
allocator: allocator,
OneInputNode: colexecop.NewOneInputNode(input),
inputTypes: inputTypes,
orderingCols: orderingCols,
k: k,
maxOutputBatchMemSize: maxOutputBatchMemSize,
}
partialOrderCols := make([]uint32, matchLen)
for i := range partialOrderCols {
partialOrderCols[i] = orderingCols[i].ColIdx
}
var err error
base.distincterInput = &colexecop.FeedOperator{}
base.distincter, base.distinctOutput, err = colexecbase.OrderedDistinctColsToOperators(
base.distincterInput, partialOrderCols, inputTypes, false, /* nullsAreDistinct */
)
if err != nil {
return base, err
}
return base, nil
}

var _ colexecop.BufferingInMemoryOperator = &topKSorter{}
Expand All @@ -74,6 +90,7 @@ type topKSorter struct {

allocator *colmem.Allocator
orderingCols []execinfrapb.Ordering_Column
partialOrderingCols []execinfrapb.Ordering_Column
inputTypes []*types.T
k uint64

Expand All @@ -97,6 +114,12 @@ type topKSorter struct {
output coldata.Batch
maxOutputBatchMemSize int64

// distincter is an operator that groups an input batch by its partially
// ordered column values.
distincterInput *colexecop.FeedOperator
distincter colexecop.Operator
distinctOutput []bool

exportedFromTopK int
exportedFromBatch int
windowedBatch coldata.Batch
Expand Down Expand Up @@ -160,8 +183,14 @@ func (t *topKSorter) Reset(ctx context.Context) {
// determine the final output ordering. This is used in emit() to output the rows
// in sorted order.
func (t *topKSorter) spool() {
t.distincter.Init(t.Ctx)
t.distincter.(colexecop.Resetter).Reset(t.Ctx)
// Fill up t.topK by spooling up to K rows from the input.
// We don't need to check for distinct groups until after we have filled
// t.topK.
t.inputBatch = t.Input.Next()
t.distincterInput.SetBatch(t.inputBatch)
t.distincter.Next()
remainingRows := t.k
for remainingRows > 0 && t.inputBatch.Length() > 0 {
fromLength := t.inputBatch.Length()
Expand All @@ -174,6 +203,8 @@ func (t *topKSorter) spool() {
remainingRows -= uint64(fromLength)
if fromLength == t.inputBatch.Length() {
t.inputBatch = t.Input.Next()
t.distincterInput.SetBatch(t.inputBatch)
t.distincter.Next()
t.firstUnprocessedTupleIdx = 0
}
}
Expand All @@ -191,8 +222,10 @@ func (t *topKSorter) spool() {
heap.Init(t)

// Read the remainder of the input. Whenever a row is less than the heap max,
// swap it in.
for t.inputBatch.Length() > 0 {
// swap it in. When we find the end of the group, we can finish reading the
// input.
groupDone := false
for t.inputBatch.Length() > 0{
t.updateComparators(inputVecIdx, t.inputBatch)
sel := t.inputBatch.Selection()
t.allocator.PerformOperation(
Expand All @@ -203,6 +236,12 @@ func (t *topKSorter) spool() {
if sel != nil {
idx = sel[i]
}
// If this is a distinct group, we have already found the top K input,
// so we can stop comparing the rest of this and subsequent batches.
if t.distinctOutput[idx] {
groupDone = true
return
}
maxIdx := t.heap[0]
if t.compareRow(inputVecIdx, topKVecIdx, idx, maxIdx) < 0 {
for j := range t.inputTypes {
Expand All @@ -214,7 +253,12 @@ func (t *topKSorter) spool() {
t.firstUnprocessedTupleIdx = t.inputBatch.Length()
},
)
if groupDone {
break
}
t.inputBatch = t.Input.Next()
t.distincterInput.SetBatch(t.inputBatch)
t.distincter.Next()
t.firstUnprocessedTupleIdx = 0
}

Expand Down
Loading

0 comments on commit a58f00e

Please sign in to comment.