Skip to content

Commit

Permalink
sql: Add efficient min, max, sum, avg when used as window functions.
Browse files Browse the repository at this point in the history
Adds linear-time implementations of min, max, sum, and avg
(using sliding window approach) instead of naive quadratic
version.

Addresses: cockroachdb#26464.

Bonus: min and max are an order of magnitude faster than PG
(when window frame doesn't include the whole partition).

Release note (performance improvement): min, max, sum, avg
now take linear time when used for aggregation as window
functions.
  • Loading branch information
yuzefovich committed Jun 27, 2018
1 parent 95ed31b commit a2c99f2
Show file tree
Hide file tree
Showing 4 changed files with 519 additions and 1 deletion.
46 changes: 45 additions & 1 deletion pkg/sql/logictest/testdata/logic_test/window
Original file line number Diff line number Diff line change
Expand Up @@ -1708,7 +1708,6 @@ Tablet iPad 700.00 NULL
Tablet Kindle Fire 150.00 NULL
Tablet Samsung 200.00 NULL


query TRRR
SELECT product_name, price, min(price) OVER (PARTITION BY group_name ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS min_over_three, max(price) OVER (PARTITION BY group_name ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS max_over_partition FROM products ORDER BY group_id;
----
Expand All @@ -1723,3 +1722,48 @@ Dell 800.00 700.00 1200.00
iPad 700.00 150.00 700.00
Kindle Fire 150.00 150.00 700.00
Samsung 200.00 150.00 700.00

query TTRT
SELECT group_name, product_name, price, min(price) OVER (PARTITION BY group_name ROWS CURRENT ROW) AS min_over_single_row FROM products ORDER BY group_id;
----
Smartphone Microsoft Lumia 200.00 200.00
Smartphone HTC One 400.00 400.00
Smartphone Nexus 500.00 500.00
Smartphone iPhone 900.00 900.00
Laptop HP Elite 1200.00 1200.00
Laptop Lenovo Thinkpad 700.00 700.00
Laptop Sony VAIO 700.00 700.00
Laptop Dell 800.00 800.00
Tablet iPad 700.00 700.00
Tablet Kindle Fire 150.00 150.00
Tablet Samsung 200.00 200.00

query TTRR
SELECT group_name, product_name, price, avg(price) OVER (PARTITION BY group_name ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING) AS running_avg FROM products ORDER BY group_id;
----
Smartphone Microsoft Lumia 200.00 600.00
Smartphone HTC One 400.00 700.00
Smartphone Nexus 500.00 900.00
Smartphone iPhone 900.00 NULL
Laptop HP Elite 1200.00 733.33333333333333333
Laptop Lenovo Thinkpad 700.00 750.00
Laptop Sony VAIO 700.00 800.00
Laptop Dell 800.00 NULL
Tablet iPad 700.00 175.00
Tablet Kindle Fire 150.00 200.00
Tablet Samsung 200.00 NULL

query TRRRRR
SELECT product_name, price, min(price) OVER (PARTITION BY group_name ROWS UNBOUNDED PRECEDING), max(price) OVER (PARTITION BY group_name ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING), sum(price) OVER (PARTITION BY group_name ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING), avg(price) OVER (PARTITION BY group_name ROWS CURRENT ROW) FROM products ORDER BY group_id;
----
Microsoft Lumia 200.00 200.00 400.00 2000.00 200.00
HTC One 400.00 200.00 500.00 2000.00 400.00
Nexus 500.00 200.00 900.00 1800.00 500.00
iPhone 900.00 200.00 900.00 1400.00 900.00
HP Elite 1200.00 1200.00 1200.00 3400.00 1200.00
Lenovo Thinkpad 700.00 700.00 1200.00 3400.00 700.00
Sony VAIO 700.00 700.00 1200.00 2200.00 700.00
Dell 800.00 700.00 1200.00 1500.00 800.00
iPad 700.00 700.00 700.00 1050.00 700.00
Kindle Fire 150.00 150.00 700.00 1050.00 150.00
Samsung 200.00 150.00 700.00 350.00 200.00
267 changes: 267 additions & 0 deletions pkg/sql/sem/builtins/window_frame_builtins.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
// Copyright 2018 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package builtins

import (
"context"

"bytes"

"fmt"

"github.com/cockroachdb/apd"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
)

// MaybeReplaceWithFasterImplementation replaces an aggregate with more efficient one, if present.
func MaybeReplaceWithFasterImplementation(
windowFunc tree.WindowFunc, evalCtx *tree.EvalContext, wfr *tree.WindowFrameRun,
) tree.WindowFunc {
if framableAgg, ok := windowFunc.(*framableAggregateWindowFunc); ok {
aggWindowFunc := framableAgg.agg.agg
switch w := aggWindowFunc.(type) {
case *MinAggregate:
min := &slidingWindowFunc{}
min.sw = &slidingWindow{
values: make([]indexedValue, 0, wfr.PartitionSize()),
cmp: func(evalCtx *tree.EvalContext, a, b tree.Datum) int {
return -a.Compare(evalCtx, b)
},
}
return min
case *MaxAggregate:
max := &slidingWindowFunc{}
max.sw = &slidingWindow{
values: make([]indexedValue, 0, wfr.PartitionSize()),
cmp: func(evalCtx *tree.EvalContext, a, b tree.Datum) int {
return a.Compare(evalCtx, b)
},
}
return max
case *intSumAggregate:
return &slidingWindowSumFunc{agg: aggWindowFunc}
case *decimalSumAggregate:
return &slidingWindowSumFunc{agg: aggWindowFunc}
case *floatSumAggregate:
return &slidingWindowSumFunc{agg: aggWindowFunc}
case *avgAggregate:
// w.agg is a sum aggregate.
return &avgWindowFunc{sum: slidingWindowSumFunc{agg: w.agg}}
}
}
return windowFunc
}

// indexedValue combines a value from the row with the index of that row.
type indexedValue struct {
value tree.Datum
idx int
}

// slidingWindow maintains a deque of values along with corresponding indices
// based on cmp function:
// for Min behavior, cmp = -a.Compare(b)
// for Max behavior, cmp = a.Compare(b)
//
// It assumes that the frame bounds will never go back, i.e. non-decreasing sequences
// of frame start and frame end indices.
type slidingWindow struct {
values []indexedValue
cmp func(*tree.EvalContext, tree.Datum, tree.Datum) int
}

// add first removes all values that are "smaller or equal" (depending on cmp)
// from the end of the deque and then appends 'iv' to the end. This way, the deque
// always contains unique values sorted in descending order of their "priority"
// (when we encounter duplicates, we always keep the one with the largest idx).
func (sw *slidingWindow) add(evalCtx *tree.EvalContext, iv indexedValue) {
var newEndIdx int
for newEndIdx = len(sw.values) - 1; newEndIdx >= 0; newEndIdx-- {
if sw.cmp(evalCtx, sw.values[newEndIdx].value, iv.value) > 0 {
break
}
}
sw.values = sw.values[:newEndIdx+1]
sw.values = append(sw.values, iv)
}

// removeAllBefore removes all values from the beginning of the deque that have indices
// smaller than given 'idx'.
// This operation corresponds to shifting the start of the frame up to 'idx'.
func (sw *slidingWindow) removeAllBefore(idx int) {
var newStartIdx int
for newStartIdx = 0; newStartIdx < len(sw.values) && newStartIdx < idx; newStartIdx++ {
if sw.values[newStartIdx].idx >= idx {
break
}
}
sw.values = sw.values[newStartIdx:]
}

func (sw *slidingWindow) string() string {
var buf bytes.Buffer
for i := 0; i < len(sw.values); i++ {
buf.WriteString(fmt.Sprintf("(%v, %v)\t", sw.values[i].value, sw.values[i].idx))
}
return buf.String()
}

type slidingWindowFunc struct {
sw *slidingWindow
prevEnd int
}

// Compute implements WindowFunc interface.
func (w *slidingWindowFunc) Compute(
ctx context.Context, evalCtx *tree.EvalContext, wfr *tree.WindowFrameRun,
) (tree.Datum, error) {
start, end := wfr.FrameStartIdx(), wfr.FrameEndIdx()

// We need to discard all values that are no longer in the frame.
w.sw.removeAllBefore(start)

// We need to add all values that just entered the frame and have not been added yet.
for idx := max(w.prevEnd, start); idx < end; idx++ {
w.sw.add(evalCtx, indexedValue{wfr.ArgsByRowIdx(idx)[0], idx})
}
w.prevEnd = end

if len(w.sw.values) == 0 {
// Spec: the frame is empty, so we return NULL.
return tree.DNull, nil
}

// The datum with "highest priority" within the frame is at the very front of the deque.
return w.sw.values[0].value, nil
}

func max(a, b int) int {
if a > b {
return a
}
return b
}

// Close implements WindowFunc interface.
func (w *slidingWindowFunc) Close(ctx context.Context, evalCtx *tree.EvalContext) {
w.sw = nil
}

// slidingWindowSumFunc applies sliding window approach to summation over a frame.
// It assumes that the frame bounds will never go back, i.e. non-decreasing sequences
// of frame start and frame end indices.
type slidingWindowSumFunc struct {
agg tree.AggregateFunc // one of the three SumAggregates
prevStart, prevEnd int
}

// removeAllBefore subtracts the values from all the rows that are no longer in the frame.
func (w *slidingWindowSumFunc) removeAllBefore(
ctx context.Context, wfr *tree.WindowFrameRun,
) error {
for idx := w.prevStart; idx < wfr.FrameStartIdx() && idx < w.prevEnd; idx++ {
value := wfr.ArgsByRowIdx(idx)[0]
switch v := value.(type) {
case *tree.DInt:
return w.agg.Add(ctx, tree.NewDInt(-*v))
case *tree.DDecimal:
d := tree.DDecimal{}
d.Neg(&v.Decimal)
return w.agg.Add(ctx, &d)
case *tree.DFloat:
return w.agg.Add(ctx, tree.NewDFloat(-*v))
default:
return pgerror.NewErrorf(pgerror.CodeInternalError, "unexpected value %v", v)
}
}
return nil
}

// Compute implements WindowFunc interface.
func (w *slidingWindowSumFunc) Compute(
ctx context.Context, evalCtx *tree.EvalContext, wfr *tree.WindowFrameRun,
) (tree.Datum, error) {
start, end := wfr.FrameStartIdx(), wfr.FrameEndIdx()

// We need to discard all values that are no longer in the frame.
err := w.removeAllBefore(ctx, wfr)
if err != nil {
return tree.DNull, err
}

// We need to sum all values that just entered the frame and have not been added yet.
for idx := max(w.prevEnd, start); idx < end; idx++ {
err = w.agg.Add(ctx, wfr.ArgsByRowIdx(idx)[0])
if err != nil {
return tree.DNull, err
}
}

w.prevStart = start
w.prevEnd = end
return w.agg.Result()
}

// Close implements WindowFunc interface.
func (w *slidingWindowSumFunc) Close(ctx context.Context, evalCtx *tree.EvalContext) {
w.agg.Close(ctx)
}

// avgWindowFunc uses slidingWindowSumFunc to compute average over a frame.
type avgWindowFunc struct {
sum slidingWindowSumFunc
}

// Compute implements WindowFunc interface.
func (w *avgWindowFunc) Compute(
ctx context.Context, evalCtx *tree.EvalContext, wfr *tree.WindowFrameRun,
) (tree.Datum, error) {
if wfr.FrameSize() == 0 {
// Spec: the frame is empty, so we return NULL.
return tree.DNull, nil
}

var sum tree.Datum
var err error
sum, err = w.sum.Compute(ctx, evalCtx, wfr)
if err != nil {
return nil, err
}

switch t := sum.(type) {
case *tree.DFloat:
return tree.NewDFloat(*t / tree.DFloat(wfr.FrameSize())), nil
case *tree.DDecimal:
var avg tree.DDecimal
count := apd.New(int64(wfr.FrameSize()), 0)
_, err := tree.DecimalCtx.Quo(&avg.Decimal, &t.Decimal, count)
return &avg, err
case *tree.DInt:
dd := tree.DDecimal{}
dd.SetCoefficient(int64(*t))
var avg tree.DDecimal
count := apd.New(int64(wfr.FrameSize()), 0)
_, err := tree.DecimalCtx.Quo(&avg.Decimal, &dd.Decimal, count)
return &avg, err
default:
return nil, pgerror.NewErrorf(pgerror.CodeInternalError, "unexpected SUM result type: %s", t)
}
}

// Close implements WindowFunc interface.
func (w *avgWindowFunc) Close(ctx context.Context, evalCtx *tree.EvalContext) {
w.sum.Close(ctx, evalCtx)
}
Loading

0 comments on commit a2c99f2

Please sign in to comment.