Skip to content

Commit

Permalink
opt: take advantage of partial ordering in topk sorter
Browse files Browse the repository at this point in the history
Recent improvements to TopK in colexec allow TopK to stop execution
early and emit its output if its sort columns were partially ordered in
the input rows. This change modifies the optimizer so that it can
find lower cost TopK plans.

This change adds two new exploration rules: `GenerateLimitedTopKScans`
and `GeneratePartialOrderTopK`. The first rule is similar to
`GenerateLimitedGroupByScans` in that it looks for secondary indexes
that could provide a partial ordering and adds the secondary index scan
and an index join to get the rest of the columns to the memo. This
allows us to explore cases of partially ordered inputs (via the index
scan) to TopK. The second rule is similar to `GenerateStreamingGroupBy`
in that it uses interesting orderings to find partial orderings.

The cost model is also updated to reflect the new estimated limit on the
number of rows TopK needs to process to find the top K rows. The limit
is propagated to TopK's child expressions as a limit hint.

Fixes: cockroachdb#69724

Release note (sql change): Improves cost model for TopK expressions if the
input to TopK can be partially ordered by its sort columns.
  • Loading branch information
rharding6373 committed Dec 16, 2021
1 parent ca800e2 commit 834de68
Show file tree
Hide file tree
Showing 17 changed files with 759 additions and 76 deletions.
2 changes: 2 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/topk
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,14 @@ vectorized: true
└── • index join
│ columns: (k, v, w)
│ ordering: +v
│ estimated row count: 1,000 (missing stats)
│ table: t@t_pkey
│ key columns: k
└── • scan
columns: (k, v)
ordering: +v
estimated row count: 1,000 (missing stats)
table: t@v
spans: FULL SCAN
2 changes: 1 addition & 1 deletion pkg/sql/opt/memo/testdata/memo
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ memo (optimized, ~20KB, required=[presentation: y:2,x:5,c:10] [ordering: +2])
│ └── []
│ ├── best: (project G2 G3 y x)
│ └── cost: 1754.72
├── G2: (limit G4 G5 ordering=+2) (top-k G4 &{10 +2})
├── G2: (limit G4 G5 ordering=+2) (top-k G4 &{10 +2 })
│ ├── [ordering: +2]
│ │ ├── best: (limit G4="[ordering: +2] [limit hint: 10.00]" G5 ordering=+2)
│ │ └── cost: 1754.51
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/opt/memo/testdata/stats/inverted-geo
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ memo (optimized, ~11KB, required=[presentation: i:1])
│ └── [presentation: i:1]
│ ├── best: (project G2 G3 i)
│ └── cost: 2652.94
├── G2: (limit G4 G5 ordering=+1) (top-k G4 &{1 +1})
├── G2: (limit G4 G5 ordering=+1) (top-k G4 &{1 +1 })
│ └── []
│ ├── best: (limit G4="[ordering: +1] [limit hint: 1.00]" G5 ordering=+1)
│ └── cost: 2652.92
Expand Down Expand Up @@ -200,7 +200,7 @@ memo (optimized, ~11KB, required=[presentation: i:1])
│ └── [presentation: i:1]
│ ├── best: (project G2 G3 i)
│ └── cost: 14.15
├── G2: (limit G4 G5 ordering=+1) (top-k G4 &{1 +1})
├── G2: (limit G4 G5 ordering=+1) (top-k G4 &{1 +1 })
│ └── []
│ ├── best: (limit G4="[ordering: +1] [limit hint: 1.00]" G5 ordering=+1)
│ └── cost: 14.13
Expand Down
28 changes: 19 additions & 9 deletions pkg/sql/opt/ops/relational.opt
Original file line number Diff line number Diff line change
Expand Up @@ -1042,17 +1042,20 @@ define Offset {
Ordering OrderingChoice
}

# TopK returns the top K, where K is a constant, rows from the input set according to its
# sort ordering, discarding the remaining rows. The Limit is a constant
# positive integer; the operator returns at most Limit rows. Rows can be sorted by one
# or more of the input columns, each of which can be sorted in either ascending
# or descending order. See the Ordering field in the PhysicalProps struct.
# TopK returns the top K, where K is a constant, rows from the input set
# according to its sort ordering, discarding the remaining rows. The Limit is a
# constant positive integer; the operator returns at most Limit rows. Rows can
# be sorted by one or more of the input columns, each of which can be sorted in
# either ascending or descending order. See the Ordering field in the
# PhysicalProps struct.
#
# Unlike the Limit relational operator, TopK does not require its input to be
# ordered. TopK can be used to substitute a Limit that requires its input to be
# ordered and performs best when the input is not already ordered. TopK scans the
# input, storing the K rows that best meet the ordering requirement in a max
# heap, then sorts the K rows.
# ordered. However, if the input is known to have a partial ordering of the
# required ordering, TopK can take advantage of optimizations. TopK can be used
# to substitute a Limit that requires its input to be ordered and performs best
# when the input is not already fully ordered. TopK scans the input, storing the
# K rows that best meet the ordering requirement in a max heap, then sorts the K
# rows.
[Relational]
define TopK {
Input RelExpr
Expand All @@ -1062,7 +1065,14 @@ define TopK {
[Private]
define TopKPrivate {
K int64

# Ordering is the required order in which the K rows should be sorted when output.
Ordering OrderingChoice

# PartialOrdering is an optional ordering imposed on the input that is
# a partial order of Ordering and allows TopK to take advantage of partial
# ordering optimizations.
PartialOrdering OrderingChoice
}

# Max1Row enforces that its input must return at most one row. If the input
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/opt/ordering/ordering.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func init() {
}
funcMap[opt.TopKOp] = funcs{
canProvideOrdering: topKCanProvideOrdering,
buildChildReqOrdering: noChildReqOrdering,
buildChildReqOrdering: topKBuildChildReqOrdering,
buildProvidedOrdering: topKBuildProvided,
}
funcMap[opt.ScalarGroupByOp] = funcs{
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/opt/ordering/topk.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,12 @@ func topKBuildProvided(expr memo.RelExpr, required *props.OrderingChoice) opt.Or
// TopK orders its own input, so the ordering it provides is its own.
return trimProvided(expr.(*memo.TopKExpr).Ordering.ToOrdering(), required, &expr.Relational().FuncDeps)
}

func topKBuildChildReqOrdering(
parent memo.RelExpr, required *props.OrderingChoice, childIdx int,
) props.OrderingChoice {
// If Top K has an input ordering to impose on its child for partial order
// optimizations, then require the child to have that ordering.
topK := parent.(*memo.TopKExpr)
return topK.PartialOrdering
}
66 changes: 57 additions & 9 deletions pkg/sql/opt/xform/coster.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,9 +570,14 @@ func (c *coster) ComputeCost(candidate memo.RelExpr, required *physical.Required

func (c *coster) computeTopKCost(topk *memo.TopKExpr, required *physical.Required) memo.Cost {
rel := topk.Relational()
inputRowCount := topk.Input.Relational().Stats.RowCount
outputRowCount := rel.Stats.RowCount

inputRowCount := topk.Input.Relational().Stats.RowCount
if !required.Ordering.Any() {
// When there is a partial ordering of the input rows' sort columns, we may
// be able to reduce the number of input rows needed to find the top K rows.
inputRowCount = topKInputLimitHint(c.mem, topk, inputRowCount, outputRowCount, float64(topk.K))
}
// Add the cost of sorting.
// Start with a cost of storing each row; TopK sort only stores K rows in a
// max heap.
Expand Down Expand Up @@ -1256,25 +1261,38 @@ func (c *coster) computeProjectSetCost(projectSet *memo.ProjectSetExpr) memo.Cos
return cost
}

// countSegments calculates the number of segments that will be used to execute
// the sort. If no input ordering is provided, there's only one segment.
func (c *coster) countSegments(sort *memo.SortExpr) float64 {
if sort.InputOrdering.Any() {
return 1
// getOrderingColStats returns the column statistic for the columns in the
// OrderingChoice oc. The OrderingChoice should be a member of expr. We include
// the Memo as an argument so that functions that call this function can be used
// both inside and outside the coster.
func getOrderingColStats(
mem *memo.Memo, expr memo.RelExpr, oc props.OrderingChoice,
) *props.ColumnStatistic {
if oc.Any() {
return nil
}
stats := sort.Relational().Stats
orderedCols := sort.InputOrdering.ColSet()
stats := expr.Relational().Stats
orderedCols := oc.ColSet()
orderedStats, ok := stats.ColStats.Lookup(orderedCols)
if !ok {
orderedStats, ok = c.mem.RequestColStat(sort, orderedCols)
orderedStats, ok = mem.RequestColStat(expr, orderedCols)
if !ok {
// I don't think we can ever get here. Since we don't allow the memo
// to be optimized twice, the coster should never be used after
// logPropsBuilder.clear() is called.
panic(errors.AssertionFailedf("could not request the stats for ColSet %v", orderedCols))
}
}
return orderedStats
}

// countSegments calculates the number of segments that will be used to execute
// the sort. If no input ordering is provided, there's only one segment.
func (c *coster) countSegments(sort *memo.SortExpr) float64 {
orderedStats := getOrderingColStats(c.mem, sort, sort.InputOrdering)
if orderedStats == nil {
return 1
}
return orderedStats.DistinctCount
}

Expand Down Expand Up @@ -1572,6 +1590,36 @@ func lookupJoinInputLimitHint(inputRowCount, outputRowCount, outputLimitHint flo
return math.Min(inputRowCount, expectedLookupCount)
}

// topKInputLimitHint calculates an appropriate limit hint for the input
// to a Top K expression when the input is partially sorted.
func topKInputLimitHint(
mem *memo.Memo, topk *memo.TopKExpr, inputRowCount, outputRowCount, K float64,
) float64 {
if outputRowCount == 0 {
return 0
}
orderedStats := getOrderingColStats(mem, topk, topk.PartialOrdering)
if orderedStats == nil {
return inputRowCount
}

// In order to find the top K rows of a partially sorted input, we estimate
// the number of rows we'll need to ingest by rounding up the nearest multiple
// of the number of rows per distinct values to K. For example, let's say we
// have 2000 input rows, 100 distinct values, and a K of 10. If we assume that
// each distinct value is found in the same number of input rows, each
// distinct value has 2000/100 = 20 rowsPerDistinctVal. Processing the rows
// for one distinct value is sufficient to find the top K 10 rows. If K were
// 50 instead, we would need to process more distinct values to find the top
// K, so we need to multiply the rowsPerDistinctVal by the minimum number of
// distinct values to process, which we can find by dividing K by the rows per
// distinct values and rounding up, or ceil(50/20) = 3. So if K is 50, we need
// to process approximately 3 * 20 = 60 rows to find the top 50 rows.
rowsPerDistinctVal := inputRowCount / orderedStats.DistinctCount
expectedRows := math.Ceil(K/rowsPerDistinctVal) * rowsPerDistinctVal
return math.Min(inputRowCount, expectedRows)
}

// lookupExprCost accounts for the extra CPU cost of the lookupExpr.
func lookupExprCost(join memo.RelExpr) memo.Cost {
lookupExpr, ok := join.(*memo.LookupJoinExpr)
Expand Down
46 changes: 8 additions & 38 deletions pkg/sql/opt/xform/groupby_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,41 +186,11 @@ func (c *CustomFuncs) GenerateStreamingGroupBy(
orders := ordering.DeriveInterestingOrderings(input)
intraOrd := private.Ordering
for _, ord := range orders {
o := ord.ToOrdering()
// We are looking for a prefix of o that satisfies the intra-group ordering
// if we ignore grouping columns.
oIdx, intraIdx := 0, 0
for ; oIdx < len(o); oIdx++ {
oCol := o[oIdx].ID()
if private.GroupingCols.Contains(oCol) || intraOrd.Optional.Contains(oCol) {
// Grouping or optional column.
continue
}

if intraIdx < len(intraOrd.Columns) &&
intraOrd.Group(intraIdx).Contains(oCol) &&
intraOrd.Columns[intraIdx].Descending == o[oIdx].Descending() {
// Column matches the one in the ordering.
intraIdx++
continue
}
break
}
if oIdx == 0 || intraIdx < len(intraOrd.Columns) {
// No match.
newOrd, fullPrefix, found := getPrefixFromOrdering(ord.ToOrdering(), intraOrd, input,
func(id opt.ColumnID) bool { return private.GroupingCols.Contains(id) })
if !found || !fullPrefix {
continue
}
o = o[:oIdx]

var newOrd props.OrderingChoice
newOrd.FromOrderingWithOptCols(o, opt.ColSet{})

// Simplify the ordering according to the input's FDs. Note that this is not
// necessary for correctness because buildChildPhysicalProps would do it
// anyway, but doing it here once can make things more efficient (and we may
// generate fewer expressions if some of these orderings turn out to be
// equivalent).
newOrd.Simplify(&input.Relational().FuncDeps)

newPrivate := *private
newPrivate.Ordering = newOrd
Expand Down Expand Up @@ -420,6 +390,8 @@ func (c *CustomFuncs) GenerateLimitedGroupByScans(
// Iterate over all non-inverted and non-partial secondary indexes.
var pkCols opt.ColSet
var iter scanIndexIter
var sb indexScanBuilder
sb.Init(c, sp.Table)
iter.Init(c.e.evalCtx, c.e.f, c.e.mem, &c.im, sp, nil /* filters */, rejectPrimaryIndex|rejectInvertedIndexes)
iter.ForEach(func(index cat.Index, filters memo.FiltersExpr, indexCols opt.ColSet, isCovering bool, constProj memo.ProjectionsExpr) {
// The iterator only produces pseudo-partial indexes (the predicate is
Expand Down Expand Up @@ -463,13 +435,11 @@ func (c *CustomFuncs) GenerateLimitedGroupByScans(
// If the index is not covering, scan the needed index columns plus
// primary key columns.
newScanPrivate.Cols.UnionWith(pkCols)
input := c.e.f.ConstructScan(&newScanPrivate)
sb.SetScan(&newScanPrivate)
// Construct an IndexJoin operator that provides the columns missing from
// the index.
input = c.e.f.ConstructIndexJoin(input, &memo.IndexJoinPrivate{
Table: sp.Table,
Cols: sp.Cols,
})
sb.AddIndexJoin(sp.Cols)
input := sb.BuildNewExpr()
// Reconstruct the GroupBy and Limit so the new expression in the memo is
// equivalent.
input = c.e.f.ConstructGroupBy(input, aggs, gp)
Expand Down
55 changes: 55 additions & 0 deletions pkg/sql/opt/xform/index_scan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,61 @@ func (b *indexScanBuilder) AddIndexJoin(cols opt.ColSet) {
}
}

// BuildNewExpr constructs the final expression by composing together the various
// expressions that were specified by previous calls to various add methods.
// It is similar to Build, but does not add the expression to the memo group.
// The output expression must be used as input to another memo expression, as
// the output expression is already interned.
// TODO(harding): Refactor with Build to avoid code duplication.
func (b *indexScanBuilder) BuildNewExpr() (output memo.RelExpr) {
// 1. Only scan.
output = b.f.ConstructScan(&b.scanPrivate)
if !b.hasConstProjections() && !b.hasInnerFilters() && !b.hasInvertedFilter() && !b.hasIndexJoin() {
return
}

// 2. Wrap input in a Project if constant projections were added.
if b.hasConstProjections() {
output = b.f.ConstructProject(output, b.constProjections, b.scanPrivate.Cols)
if !b.hasInnerFilters() && !b.hasInvertedFilter() && !b.hasIndexJoin() {
return
}
}

// 3. Wrap input in inner filter if it was added.
if b.hasInnerFilters() {
output = b.f.ConstructSelect(output, b.innerFilters)
if !b.hasInvertedFilter() && !b.hasIndexJoin() {
return
}
}

// 4. Wrap input in inverted filter if it was added.
if b.hasInvertedFilter() {
output = b.f.ConstructInvertedFilter(output, &b.invertedFilterPrivate)
if !b.hasIndexJoin() {
return
}
}

// 5. Wrap input in index join if it was added.
if b.hasIndexJoin() {
output = b.f.ConstructIndexJoin(output, &b.indexJoinPrivate)
if !b.hasOuterFilters() {
return
}
}

// 6. Wrap input in outer filter (which must exist at this point).
if !b.hasOuterFilters() {
// indexJoinDef == 0: outerFilters == 0 handled by #1-4 above.
// indexJoinDef != 0: outerFilters == 0 handled by #5 above.
panic(errors.AssertionFailedf("outer filter cannot be 0 at this point"))
}
output = b.f.ConstructSelect(output, b.outerFilters)
return
}

// Build constructs the final memo expression by composing together the various
// expressions that were specified by previous calls to various add methods.
func (b *indexScanBuilder) Build(grp memo.RelExpr) {
Expand Down
Loading

0 comments on commit 834de68

Please sign in to comment.