Skip to content

Commit

Permalink
colexec: optimize min and max window functions with default exclusion
Browse files Browse the repository at this point in the history
This commit modifies the 'min' and 'max' aggregate window functions
to implement the `slidingWindowAggregateFunc` interface, which allows
them to be used in a sliding window context. However, this is only
usable when the window frame never shrinks - e.g. it always contains
all rows from the previous frame.

This commit also provides implementations of `min` and `max` for use
when the window frame can shrink. The indices of the 'next best'
minimum or maximum values are stored in a priority queue that is
updated for each row. Using the priority queue allows the `min` and
`max` operators to avoid fully aggregating over the window frame
even when the previous best value goes out of scope. Note that this
implementation currently does not handle the case of non-default
exclusion clause, in which case we must fall back to the quadratic
approach.

Release note: None
  • Loading branch information
DrewKimball committed Aug 13, 2021
1 parent e2ed7d7 commit cd63a65
Show file tree
Hide file tree
Showing 18 changed files with 6,084 additions and 991 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,7 @@ EXECGEN_TARGETS = \
pkg/sql/colexec/colexecwindow/lag.eg.go \
pkg/sql/colexec/colexecwindow/last_value.eg.go \
pkg/sql/colexec/colexecwindow/lead.eg.go \
pkg/sql/colexec/colexecwindow/min_max_removable_agg.eg.go \
pkg/sql/colexec/colexecwindow/ntile.eg.go \
pkg/sql/colexec/colexecwindow/nth_value.eg.go \
pkg/sql/colexec/colexecwindow/range_offset_handler.eg.go \
Expand Down
23 changes: 16 additions & 7 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,7 @@ func NewColOperator(
opName := opNamePrefix + strings.ToLower(wf.Func.AggregateFunc.String())
result.finishBufferedWindowerArgs(
ctx, flowCtx, windowArgs, opName, spec.ProcessorID, factory, true /* needsBuffer */)
aggType := *wf.Func.AggregateFunc
switch *wf.Func.AggregateFunc {
case execinfrapb.CountRows:
// count_rows has a specialized implementation.
Expand All @@ -1375,20 +1376,28 @@ func NewColOperator(
colIdx[i] = uint32(i)
}
aggregations := []execinfrapb.AggregatorSpec_Aggregation{{
Func: *wf.Func.AggregateFunc,
Func: aggType,
ColIdx: colIdx,
}}
semaCtx := flowCtx.TypeResolverFactory.NewSemaContext(evalCtx.Txn)
aggArgs.Constructors, aggArgs.ConstArguments, aggArgs.OutputTypes, err =
colexecagg.ProcessAggregations(evalCtx, semaCtx, aggregations, argTypes)
aggFnsAlloc, _, toClose, err := colexecagg.NewAggregateFuncsAlloc(
&aggArgs, aggregations, 1 /* allocSize */, colexecagg.WindowAggKind,
)
if err != nil {
colexecerror.InternalError(err)
var toClose colexecop.Closers
var aggFnsAlloc *colexecagg.AggregateFuncsAlloc
if (aggType != execinfrapb.Min && aggType != execinfrapb.Max) ||
wf.Frame.Exclusion != execinfrapb.WindowerSpec_Frame_NO_EXCLUSION ||
!colexecwindow.WindowFrameCanShrink(wf.Frame, &wf.Ordering) {
// Min and max window functions have specialized implementations
// when the frame can shrink and has a default exclusion clause.
aggFnsAlloc, _, toClose, err = colexecagg.NewAggregateFuncsAlloc(
&aggArgs, aggregations, 1 /* allocSize */, colexecagg.WindowAggKind,
)
if err != nil {
colexecerror.InternalError(err)
}
}
result.Root = colexecwindow.NewWindowAggregatorOperator(
windowArgs, wf.Frame, &wf.Ordering, argIdxs,
windowArgs, aggType, wf.Frame, &wf.Ordering, argIdxs,
aggArgs.OutputTypes[0], aggFnsAlloc, toClose)
}
} else {
Expand Down
Loading

0 comments on commit cd63a65

Please sign in to comment.