Skip to content

Commit

Permalink
sql: use Datums for offsets of window frames
Browse files Browse the repository at this point in the history
Adds support for storing non-integer offsets
of window frames necessary for RANGE mode.
Also checks that there is ordering on a single
column that window function is computed over
if in RANGE mode.

Incremental change towards: cockroachdb#27100.

Release note: None
  • Loading branch information
yuzefovich committed Aug 1, 2018
1 parent 9b1ed6a commit 7deffdd
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 52 deletions.
20 changes: 16 additions & 4 deletions pkg/sql/logictest/testdata/logic_test/window
Original file line number Diff line number Diff line change
Expand Up @@ -1922,14 +1922,26 @@ INSERT INTO products (group_name, product_name, price, priceInt, priceFloat) VAL
('Tablet', 'Samsung', 200, 200, 200)

statement error cannot copy window "w" because it has a frame clause
SELECT price, max(price) OVER (w ORDER BY price) AS max_price FROM products WINDOW w AS (PARTITION BY price ROWS UNBOUNDED PRECEDING)
SELECT avg(price) OVER (w) FROM products WINDOW w AS (ROWS 1 PRECEDING)

statement error cannot copy window "w" because it has a frame clause
SELECT avg(price) OVER (w ORDER BY price) FROM products WINDOW w AS (ROWS 1 PRECEDING)

statement error frame starting offset must not be null
SELECT avg(price) OVER (ROWS NULL PRECEDING) FROM products

statement error frame starting offset must not be null
SELECT avg(price) OVER (ROWS BETWEEN NULL PRECEDING AND 1 FOLLOWING) FROM products

statement error frame starting offset must not be negative
SELECT price, avg(price) OVER (PARTITION BY price ROWS -1 PRECEDING) AS avg_price FROM products

statement error frame starting offset must not be negative
SELECT price, avg(price) OVER w AS avg_price FROM products WINDOW w AS (PARTITION BY price ROWS -1 PRECEDING)

statement error frame ending offset must not be null
SELECT avg(price) OVER (ROWS BETWEEN 1 PRECEDING AND NULL FOLLOWING) FROM products

statement error frame ending offset must not be negative
SELECT price, avg(price) OVER (PARTITION BY price ROWS BETWEEN 1 FOLLOWING AND -1 FOLLOWING) AS avg_price FROM products

Expand All @@ -1942,19 +1954,19 @@ SELECT product_name, price, min(price) OVER (PARTITION BY group_name ROWS BETWEE
statement error incompatible window frame start type: decimal
SELECT avg(price) OVER (PARTITION BY group_name ROWS 1.5 PRECEDING) AS avg_price FROM products

statement error argument of window frame start must be type int, not type decimal
statement error incompatible window frame start type: decimal
SELECT avg(price) OVER w AS avg_price FROM products WINDOW w AS (PARTITION BY group_name ROWS 1.5 PRECEDING)

statement error incompatible window frame start type: decimal
SELECT avg(price) OVER (PARTITION BY group_name ROWS BETWEEN 1.5 PRECEDING AND UNBOUNDED FOLLOWING) AS avg_price FROM products

statement error argument of window frame start must be type int, not type decimal
statement error incompatible window frame start type: decimal
SELECT avg(price) OVER w AS avg_price FROM products WINDOW w AS (PARTITION BY group_name ROWS BETWEEN 1.5 PRECEDING AND UNBOUNDED FOLLOWING)

statement error incompatible window frame end type: decimal
SELECT avg(price) OVER (PARTITION BY group_name ROWS BETWEEN UNBOUNDED PRECEDING AND 1.5 FOLLOWING) AS avg_price FROM products

statement error argument of window frame end must be type int, not type decimal
statement error incompatible window frame end type: decimal
SELECT avg(price) OVER w AS avg_price FROM products WINDOW w AS (PARTITION BY group_name ROWS BETWEEN UNBOUNDED PRECEDING AND 1.5 FOLLOWING)

query TRT
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/logictest/testdata/planner_test/window
Original file line number Diff line number Diff line change
Expand Up @@ -200,3 +200,15 @@ sort · ·
└── scan · · (k int, v int, w[omitted] int, f[omitted] float, d decimal, s[omitted] string, b[omitted] bool) k!=NULL; key(k)
· table kv@primary · ·
· spans ALL · ·

query TTTTT
EXPLAIN (TYPES) SELECT max(k) OVER (ROWS 1 PRECEDING) FROM kv
----
window · · (max int) ·
│ window 0 (max((k)[int]) OVER (ROWS (1)[int] PRECEDING))[int] · ·
│ render 0 (max((k)[int]) OVER (ROWS (1)[int] PRECEDING))[int] · ·
└── render · · (k int) k!=NULL; key(k)
│ render 0 (k)[int] · ·
└── scan · · (k int, v[omitted] int, w[omitted] int, f[omitted] float, d[omitted] decimal, s[omitted] string, b[omitted] bool) k!=NULL; key(k)
· table kv@primary · ·
· spans ALL · ·
12 changes: 6 additions & 6 deletions pkg/sql/sem/builtins/window_frame_builtins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func testSlidingWindow(t *testing.T, count int) {

func testMin(t *testing.T, evalCtx *tree.EvalContext, wfr *tree.WindowFrameRun) {
for offset := 0; offset < maxOffset; offset += int(rand.Int31n(maxOffset / 10)) {
wfr.StartBoundOffset = offset
wfr.EndBoundOffset = offset
wfr.StartBoundOffset = tree.NewDInt(tree.DInt(offset))
wfr.EndBoundOffset = tree.NewDInt(tree.DInt(offset))
min := &slidingWindowFunc{}
min.sw = makeSlidingWindow(evalCtx, func(evalCtx *tree.EvalContext, a, b tree.Datum) int {
return -a.Compare(evalCtx, b)
Expand Down Expand Up @@ -111,8 +111,8 @@ func testMin(t *testing.T, evalCtx *tree.EvalContext, wfr *tree.WindowFrameRun)

func testMax(t *testing.T, evalCtx *tree.EvalContext, wfr *tree.WindowFrameRun) {
for offset := 0; offset < maxOffset; offset += int(rand.Int31n(maxOffset / 10)) {
wfr.StartBoundOffset = offset
wfr.EndBoundOffset = offset
wfr.StartBoundOffset = tree.NewDInt(tree.DInt(offset))
wfr.EndBoundOffset = tree.NewDInt(tree.DInt(offset))
max := &slidingWindowFunc{}
max.sw = makeSlidingWindow(evalCtx, func(evalCtx *tree.EvalContext, a, b tree.Datum) int {
return a.Compare(evalCtx, b)
Expand Down Expand Up @@ -146,8 +146,8 @@ func testMax(t *testing.T, evalCtx *tree.EvalContext, wfr *tree.WindowFrameRun)

func testSumAndAvg(t *testing.T, evalCtx *tree.EvalContext, wfr *tree.WindowFrameRun) {
for offset := 0; offset < maxOffset; offset += int(rand.Int31n(maxOffset / 10)) {
wfr.StartBoundOffset = offset
wfr.EndBoundOffset = offset
wfr.StartBoundOffset = tree.NewDInt(tree.DInt(offset))
wfr.EndBoundOffset = tree.NewDInt(tree.DInt(offset))
sum := &slidingWindowSumFunc{agg: &intSumAggregate{}}
avg := &avgWindowFunc{sum: slidingWindowSumFunc{agg: &intSumAggregate{}}}
for wfr.RowIdx = 0; wfr.RowIdx < wfr.PartitionSize(); wfr.RowIdx++ {
Expand Down
64 changes: 49 additions & 15 deletions pkg/sql/sem/tree/type_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,21 +819,8 @@ func (expr *FuncExpr) TypeCheck(ctx *SemaContext, desired types.T) (TypedExpr, e
expr.WindowDef.OrderBy[i].Expr = typedOrderBy
}
if expr.WindowDef.Frame != nil {
bounds := expr.WindowDef.Frame.Bounds
startBound, endBound := bounds.StartBound, bounds.EndBound
if startBound.OffsetExpr != nil {
typedStartOffsetExpr, err := typeCheckAndRequire(ctx, startBound.OffsetExpr, types.Int, "window frame start")
if err != nil {
return nil, err
}
startBound.OffsetExpr = typedStartOffsetExpr
}
if endBound != nil && endBound.OffsetExpr != nil {
typedEndOffsetExpr, err := typeCheckAndRequire(ctx, endBound.OffsetExpr, types.Int, "window frame end")
if err != nil {
return nil, err
}
endBound.OffsetExpr = typedEndOffsetExpr
if err := expr.WindowDef.Frame.TypeCheck(ctx, expr.WindowDef); err != nil {
return nil, err
}
}
} else {
Expand Down Expand Up @@ -2092,3 +2079,50 @@ func (v stripFuncsVisitor) VisitPre(expr Expr) (recurse bool, newExpr Expr) {
}

func (stripFuncsVisitor) VisitPost(expr Expr) Expr { return expr }

// TypeCheck checks that offsets of the window frame (if present) are of the
// appropriate type.
func (f *WindowFrame) TypeCheck(ctx *SemaContext, windowDef *WindowDef) error {
bounds := f.Bounds
startBound, endBound := bounds.StartBound, bounds.EndBound
var requiredType types.T
switch f.Mode {
case ROWS:
// In ROWS mode, offsets must be non-null, non-negative integers. Non-nullity
// and non-negativity will be checked later.
requiredType = types.Int
case RANGE:
// In RANGE mode, offsets must be non-null and non-negative datums of a type
// dependent on the type of the column over which window function is computed.
// Non-nullity and non-negativity will be checked later.
if startBound.OffsetExpr != nil || (endBound != nil && endBound.OffsetExpr != nil) {
// At least one of the bounds is of type `value PRECEDING` or 'value FOLLOWING'.
// We require ordering on a single column.
if len(windowDef.OrderBy) != 1 {
return pgerror.NewErrorf(pgerror.CodeWindowingError, "RANGE mode requires that the ORDER BY clause specify exactly one column")
}
requiredType = windowDef.OrderBy[0].Expr.(TypedExpr).ResolvedType()
if types.IsDateTimeType(requiredType) {
// Spec: for datetime ordering columns, the required type is an 'interval'.
requiredType = types.Interval
}
}
default:
panic("unexpected WindowFrameMode")
}
if startBound.OffsetExpr != nil {
typedStartOffsetExpr, err := typeCheckAndRequire(ctx, startBound.OffsetExpr, requiredType, "window frame start")
if err != nil {
return err
}
startBound.OffsetExpr = typedStartOffsetExpr
}
if endBound != nil && endBound.OffsetExpr != nil {
typedEndOffsetExpr, err := typeCheckAndRequire(ctx, endBound.OffsetExpr, requiredType, "window frame end")
if err != nil {
return err
}
endBound.OffsetExpr = typedEndOffsetExpr
}
return nil
}
16 changes: 10 additions & 6 deletions pkg/sql/sem/tree/window_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ type WindowFrameRun struct {
ArgIdxStart int // the index which arguments to the window function begin
ArgCount int // the number of window function arguments
Frame *WindowFrame // If non-nil, Frame represents the frame specification of this window. If nil, default frame is used.
StartBoundOffset int // TODO(yuzefovich): interval offsets like '10 days' PRECEDING need to be supported.
EndBoundOffset int
StartBoundOffset Datum
EndBoundOffset Datum

// changes for each row (each call to WindowFunc.Add)
RowIdx int // the current row index
Expand Down Expand Up @@ -76,15 +76,17 @@ func (wfr WindowFrameRun) FrameStartIdx() int {
case UnboundedPreceding:
return 0
case ValuePreceding:
idx := wfr.RowIdx - wfr.StartBoundOffset
offset := MustBeDInt(wfr.StartBoundOffset)
idx := wfr.RowIdx - int(offset)
if idx < 0 {
idx = 0
}
return idx
case CurrentRow:
return wfr.RowIdx
case ValueFollowing:
idx := wfr.RowIdx + wfr.StartBoundOffset
offset := MustBeDInt(wfr.StartBoundOffset)
idx := wfr.RowIdx + int(offset)
if idx >= wfr.PartitionSize() {
idx = wfr.unboundedFollowing()
}
Expand Down Expand Up @@ -142,15 +144,17 @@ func (wfr WindowFrameRun) FrameEndIdx() int {
}
switch wfr.Frame.Bounds.EndBound.BoundType {
case ValuePreceding:
idx := wfr.RowIdx - wfr.EndBoundOffset + 1
offset := MustBeDInt(wfr.EndBoundOffset)
idx := wfr.RowIdx - int(offset) + 1
if idx < 0 {
idx = 0
}
return idx
case CurrentRow:
return wfr.RowIdx + 1
case ValueFollowing:
idx := wfr.RowIdx + wfr.EndBoundOffset + 1
offset := MustBeDInt(wfr.EndBoundOffset)
idx := wfr.RowIdx + int(offset) + 1
if idx >= wfr.PartitionSize() {
idx = wfr.unboundedFollowing()
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/sql/sem/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,3 +508,24 @@ func IsValidArrayElementType(t T) bool {
return true
}
}

// IsDateTimeType returns true if the T is
// date- or time-related type.
func IsDateTimeType(t T) bool {
switch t {
case Date:
return true
case Time:
return true
case TimeTZ:
return true
case Timestamp:
return true
case TimestampTZ:
return true
case Interval:
return true
default:
return false
}
}
85 changes: 64 additions & 21 deletions pkg/sql/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/types"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/mon"
)
Expand Down Expand Up @@ -337,21 +338,8 @@ func (p *planner) constructWindowDefinitions(

// Validate frame of the window definition if present.
if windowDef.Frame != nil {
bounds := windowDef.Frame.Bounds
startBound, endBound := bounds.StartBound, bounds.EndBound
if startBound.OffsetExpr != nil {
typedStartOffsetExpr, err := tree.TypeCheckAndRequire(startBound.OffsetExpr, &p.semaCtx, types.Int, "window frame start")
if err != nil {
return err
}
startBound.OffsetExpr = typedStartOffsetExpr
}
if endBound != nil && endBound.OffsetExpr != nil {
typedEndOffsetExpr, err := tree.TypeCheckAndRequire(endBound.OffsetExpr, &p.semaCtx, types.Int, "window frame end")
if err != nil {
return err
}
endBound.OffsetExpr = typedEndOffsetExpr
if err := windowDef.Frame.TypeCheck(&p.semaCtx, &windowDef); err != nil {
return err
}
}

Expand Down Expand Up @@ -664,29 +652,68 @@ func (n *windowNode) computeWindows(ctx context.Context, evalCtx *tree.EvalConte
// OffsetExpr's must be integer expressions not containing any variables, aggregate functions, or window functions,
// so we need to make sure these expressions are evaluated before using offsets.
bounds := frameRun.Frame.Bounds
startBound, endBound := bounds.StartBound, bounds.EndBound
var requiredType types.T
switch frameRun.Frame.Mode {
case tree.ROWS:
// In ROWS mode, offsets must be non-null, non-negative integers.
requiredType = types.Int
case tree.RANGE:
// In RANGE mode, offsets must be non-null and non-negative datums of
// a type dependent on the type of the ordering column.
if startBound.OffsetExpr != nil || (endBound != nil && endBound.OffsetExpr != nil) {
// At least one of the bounds is of type `value' PRECEDING or 'value'
// FOLLOWING'. We require ordering on a single column.
if len(windowFn.columnOrdering) != 1 {
return pgerror.NewErrorf(pgerror.CodeWindowingError, "RANGE mode requires that the ORDER BY clause specify exactly one column")
}
// Assumption: renderNode is a parent of windowNode, and its renders
// contain IndexedVars representing the input columns.
switch r := n.plan.(type) {
case *renderNode:
ordCol, ok := r.render[windowFn.columnOrdering[0].ColIdx].(*tree.IndexedVar)
if !ok {
panic("unexpected: render of the ordering column is not an IndexedVar")
}
requiredType = ordCol.ResolvedType()
if types.IsDateTimeType(requiredType) {
// Spec: for datetime ordering columns, the required type is an 'interval'.
requiredType = types.Interval
}
default:
panic("unexpected: parent of windowNode is not renderNode")
}
}
default:
panic("unexpected WindowFrameMode")
}
if bounds.StartBound.OffsetExpr != nil {
typedStartOffset := bounds.StartBound.OffsetExpr.(tree.TypedExpr)
dStartOffset, err := typedStartOffset.Eval(evalCtx)
if err != nil {
return err
}
startOffset := int(tree.MustBeDInt(dStartOffset))
if startOffset < 0 {
if dStartOffset == tree.DNull {
return pgerror.NewErrorf(pgerror.CodeNullValueNotAllowedError, "frame starting offset must not be null")
}
if isNegative(evalCtx, dStartOffset) {
return pgerror.NewErrorf(pgerror.CodeInvalidParameterValueError, "frame starting offset must not be negative")
}
frameRun.StartBoundOffset = startOffset
frameRun.StartBoundOffset = dStartOffset
}
if bounds.EndBound != nil && bounds.EndBound.OffsetExpr != nil {
typedEndOffset := bounds.EndBound.OffsetExpr.(tree.TypedExpr)
dEndOffset, err := typedEndOffset.Eval(evalCtx)
if err != nil {
return err
}
endOffset := int(tree.MustBeDInt(dEndOffset))
if endOffset < 0 {
if dEndOffset == tree.DNull {
return pgerror.NewErrorf(pgerror.CodeNullValueNotAllowedError, "frame ending offset must not be null")
}
if isNegative(evalCtx, dEndOffset) {
return pgerror.NewErrorf(pgerror.CodeInvalidParameterValueError, "frame ending offset must not be negative")
}
frameRun.EndBoundOffset = endOffset
frameRun.EndBoundOffset = dEndOffset
}
}

Expand Down Expand Up @@ -845,6 +872,22 @@ func (n *windowNode) computeWindows(ctx context.Context, evalCtx *tree.EvalConte
return nil
}

// isNegative returns whether offset is negative.
func isNegative(evalCtx *tree.EvalContext, offset tree.Datum) bool {
switch o := offset.(type) {
case *tree.DInt:
return *o < 0
case *tree.DDecimal:
return o.Negative
case *tree.DFloat:
return *o < 0
case *tree.DInterval:
return o.Compare(evalCtx, &tree.DInterval{Duration: duration.Duration{}}) < 0
default:
panic("unexpected offset type")
}
}

// populateValues populates n.run.values with final datum values after computing
// window result values in n.run.windowValues.
func (n *windowNode) populateValues(ctx context.Context, evalCtx *tree.EvalContext) error {
Expand Down

0 comments on commit 7deffdd

Please sign in to comment.