Skip to content

Commit

Permalink
sql: use Datums for offsets of window frames
Browse files Browse the repository at this point in the history
Adds support for storing non-integer offsets
of window frames necessary for RANGE mode.
Also checks that there is ordering on a single
column that window function is computed over
if in RANGE mode.

Incremental change towards: cockroachdb#27100.

Release note: None
  • Loading branch information
yuzefovich committed Jul 31, 2018
1 parent 9b1ed6a commit 6f93fec
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 52 deletions.
11 changes: 7 additions & 4 deletions pkg/sql/logictest/testdata/logic_test/window
Original file line number Diff line number Diff line change
Expand Up @@ -1922,7 +1922,10 @@ INSERT INTO products (group_name, product_name, price, priceInt, priceFloat) VAL
('Tablet', 'Samsung', 200, 200, 200)

statement error cannot copy window "w" because it has a frame clause
SELECT price, max(price) OVER (w ORDER BY price) AS max_price FROM products WINDOW w AS (PARTITION BY price ROWS UNBOUNDED PRECEDING)
SELECT avg(price) OVER (w) FROM products WINDOW w AS (ROWS 1 PRECEDING)

statement error cannot copy window "w" because it has a frame clause
SELECT avg(price) OVER (w ORDER BY price) FROM products WINDOW w AS (ROWS 1 PRECEDING)

statement error frame starting offset must not be negative
SELECT price, avg(price) OVER (PARTITION BY price ROWS -1 PRECEDING) AS avg_price FROM products
Expand All @@ -1942,19 +1945,19 @@ SELECT product_name, price, min(price) OVER (PARTITION BY group_name ROWS BETWEE
statement error incompatible window frame start type: decimal
SELECT avg(price) OVER (PARTITION BY group_name ROWS 1.5 PRECEDING) AS avg_price FROM products

statement error argument of window frame start must be type int, not type decimal
statement error incompatible window frame start type: decimal
SELECT avg(price) OVER w AS avg_price FROM products WINDOW w AS (PARTITION BY group_name ROWS 1.5 PRECEDING)

statement error incompatible window frame start type: decimal
SELECT avg(price) OVER (PARTITION BY group_name ROWS BETWEEN 1.5 PRECEDING AND UNBOUNDED FOLLOWING) AS avg_price FROM products

statement error argument of window frame start must be type int, not type decimal
statement error incompatible window frame start type: decimal
SELECT avg(price) OVER w AS avg_price FROM products WINDOW w AS (PARTITION BY group_name ROWS BETWEEN 1.5 PRECEDING AND UNBOUNDED FOLLOWING)

statement error incompatible window frame end type: decimal
SELECT avg(price) OVER (PARTITION BY group_name ROWS BETWEEN UNBOUNDED PRECEDING AND 1.5 FOLLOWING) AS avg_price FROM products

statement error argument of window frame end must be type int, not type decimal
statement error incompatible window frame end type: decimal
SELECT avg(price) OVER w AS avg_price FROM products WINDOW w AS (PARTITION BY group_name ROWS BETWEEN UNBOUNDED PRECEDING AND 1.5 FOLLOWING)

query TRT
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/sem/builtins/window_frame_builtins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ func testSlidingWindow(t *testing.T, count int) {

func testMin(t *testing.T, evalCtx *tree.EvalContext, wfr *tree.WindowFrameRun) {
for offset := 0; offset < maxOffset; offset += int(rand.Int31n(maxOffset / 10)) {
wfr.StartBoundOffset = offset
wfr.EndBoundOffset = offset
wfr.StartBoundOffset = tree.NewDInt(tree.DInt(offset))
wfr.EndBoundOffset = tree.NewDInt(tree.DInt(offset))
min := &slidingWindowFunc{}
min.sw = makeSlidingWindow(evalCtx, func(evalCtx *tree.EvalContext, a, b tree.Datum) int {
return -a.Compare(evalCtx, b)
Expand Down Expand Up @@ -111,8 +111,8 @@ func testMin(t *testing.T, evalCtx *tree.EvalContext, wfr *tree.WindowFrameRun)

func testMax(t *testing.T, evalCtx *tree.EvalContext, wfr *tree.WindowFrameRun) {
for offset := 0; offset < maxOffset; offset += int(rand.Int31n(maxOffset / 10)) {
wfr.StartBoundOffset = offset
wfr.EndBoundOffset = offset
wfr.StartBoundOffset = tree.NewDInt(tree.DInt(offset))
wfr.EndBoundOffset = tree.NewDInt(tree.DInt(offset))
max := &slidingWindowFunc{}
max.sw = makeSlidingWindow(evalCtx, func(evalCtx *tree.EvalContext, a, b tree.Datum) int {
return a.Compare(evalCtx, b)
Expand Down Expand Up @@ -146,8 +146,8 @@ func testMax(t *testing.T, evalCtx *tree.EvalContext, wfr *tree.WindowFrameRun)

func testSumAndAvg(t *testing.T, evalCtx *tree.EvalContext, wfr *tree.WindowFrameRun) {
for offset := 0; offset < maxOffset; offset += int(rand.Int31n(maxOffset / 10)) {
wfr.StartBoundOffset = offset
wfr.EndBoundOffset = offset
wfr.StartBoundOffset = tree.NewDInt(tree.DInt(offset))
wfr.EndBoundOffset = tree.NewDInt(tree.DInt(offset))
sum := &slidingWindowSumFunc{agg: &intSumAggregate{}}
avg := &avgWindowFunc{sum: slidingWindowSumFunc{agg: &intSumAggregate{}}}
for wfr.RowIdx = 0; wfr.RowIdx < wfr.PartitionSize(); wfr.RowIdx++ {
Expand Down
75 changes: 60 additions & 15 deletions pkg/sql/sem/tree/type_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -819,21 +819,8 @@ func (expr *FuncExpr) TypeCheck(ctx *SemaContext, desired types.T) (TypedExpr, e
expr.WindowDef.OrderBy[i].Expr = typedOrderBy
}
if expr.WindowDef.Frame != nil {
bounds := expr.WindowDef.Frame.Bounds
startBound, endBound := bounds.StartBound, bounds.EndBound
if startBound.OffsetExpr != nil {
typedStartOffsetExpr, err := typeCheckAndRequire(ctx, startBound.OffsetExpr, types.Int, "window frame start")
if err != nil {
return nil, err
}
startBound.OffsetExpr = typedStartOffsetExpr
}
if endBound != nil && endBound.OffsetExpr != nil {
typedEndOffsetExpr, err := typeCheckAndRequire(ctx, endBound.OffsetExpr, types.Int, "window frame end")
if err != nil {
return nil, err
}
endBound.OffsetExpr = typedEndOffsetExpr
if err := expr.WindowDef.Frame.TypeCheck(ctx, expr.WindowDef); err != nil {
return nil, err
}
}
} else {
Expand Down Expand Up @@ -2092,3 +2079,61 @@ func (v stripFuncsVisitor) VisitPre(expr Expr) (recurse bool, newExpr Expr) {
}

func (stripFuncsVisitor) VisitPost(expr Expr) Expr { return expr }

// TypeCheck checks that offsets of the window frame (if present) are
// of the appropriate type.
func (f *WindowFrame) TypeCheck(ctx *SemaContext, windowDef *WindowDef) error {
bounds := f.Bounds
startBound, endBound := bounds.StartBound, bounds.EndBound
var requiredType types.T
switch f.Mode {
case ROWS:
// In ROWS mode, offsets must be non-null, non-negative integers.
// Non-negativity will be checked later.
requiredType = types.Int
case RANGE:
// In RANGE mode, offsets must be non-null and non-negative.
// Non-negativity will be checked later.
if startBound.OffsetExpr != nil || (endBound != nil && endBound.OffsetExpr != nil) {
// At least one of the bounds is of type `value PRECEDING` or 'value FOLLOWING'.
// We require ordering on the single column that is the first argument
// to a window function that has this window frame. If the ordering is
// on the different column, we'll catch it later.
// TODO(yuzefovich): this behavior, however, might produce different error:
// for example, computing average over INT column, but the ordering
// is on STRING - we'll get 'incompatible types' error whereas the correct
// error would be 'ordering on a different column'.
if len(windowDef.OrderBy) != 1 {
return pgerror.NewErrorf(pgerror.CodeWindowingError, "RANGE mode requires that the ORDER BY clause specify exactly one column")
}
requiredType = windowDef.OrderBy[0].Expr.(TypedExpr).ResolvedType()
if types.IsDateTimeType(requiredType) {
// Spec: for datetime ordering columns, the required type is an 'interval'.
requiredType = types.Interval
}
}
default:
panic("unexpected WindowFrameMode")
}
if startBound.OffsetExpr != nil {
typedStartOffsetExpr, err := typeCheckAndRequire(ctx, startBound.OffsetExpr, requiredType, "window frame start")
if err != nil {
return err
}
startBound.OffsetExpr = typedStartOffsetExpr
if startBound.OffsetExpr == DNull {
return pgerror.NewErrorf(pgerror.CodeWindowingError, "frame starting offset must be non-null")
}
}
if endBound != nil && endBound.OffsetExpr != nil {
typedEndOffsetExpr, err := typeCheckAndRequire(ctx, endBound.OffsetExpr, requiredType, "window frame end")
if err != nil {
return err
}
endBound.OffsetExpr = typedEndOffsetExpr
if endBound.OffsetExpr == DNull {
return pgerror.NewErrorf(pgerror.CodeWindowingError, "frame ending offset must be non-null")
}
}
return nil
}
16 changes: 10 additions & 6 deletions pkg/sql/sem/tree/window_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ type WindowFrameRun struct {
ArgIdxStart int // the index which arguments to the window function begin
ArgCount int // the number of window function arguments
Frame *WindowFrame // If non-nil, Frame represents the frame specification of this window. If nil, default frame is used.
StartBoundOffset int // TODO(yuzefovich): interval offsets like '10 days' PRECEDING need to be supported.
EndBoundOffset int
StartBoundOffset Datum
EndBoundOffset Datum

// changes for each row (each call to WindowFunc.Add)
RowIdx int // the current row index
Expand Down Expand Up @@ -76,15 +76,17 @@ func (wfr WindowFrameRun) FrameStartIdx() int {
case UnboundedPreceding:
return 0
case ValuePreceding:
idx := wfr.RowIdx - wfr.StartBoundOffset
offset := MustBeDInt(wfr.StartBoundOffset)
idx := wfr.RowIdx - int(offset)
if idx < 0 {
idx = 0
}
return idx
case CurrentRow:
return wfr.RowIdx
case ValueFollowing:
idx := wfr.RowIdx + wfr.StartBoundOffset
offset := MustBeDInt(wfr.StartBoundOffset)
idx := wfr.RowIdx + int(offset)
if idx >= wfr.PartitionSize() {
idx = wfr.unboundedFollowing()
}
Expand Down Expand Up @@ -142,15 +144,17 @@ func (wfr WindowFrameRun) FrameEndIdx() int {
}
switch wfr.Frame.Bounds.EndBound.BoundType {
case ValuePreceding:
idx := wfr.RowIdx - wfr.EndBoundOffset + 1
offset := MustBeDInt(wfr.EndBoundOffset)
idx := wfr.RowIdx - int(offset) + 1
if idx < 0 {
idx = 0
}
return idx
case CurrentRow:
return wfr.RowIdx + 1
case ValueFollowing:
idx := wfr.RowIdx + wfr.EndBoundOffset + 1
offset := MustBeDInt(wfr.EndBoundOffset)
idx := wfr.RowIdx + int(offset) + 1
if idx >= wfr.PartitionSize() {
idx = wfr.unboundedFollowing()
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/sql/sem/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,3 +508,24 @@ func IsValidArrayElementType(t T) bool {
return true
}
}

// IsDateTimeType returns true if the T is
// date- or time-related type.
func IsDateTimeType(t T) bool {
switch t {
case Date:
return true
case Time:
return true
case TimeTZ:
return true
case Timestamp:
return true
case TimestampTZ:
return true
case Interval:
return true
default:
return false
}
}
91 changes: 70 additions & 21 deletions pkg/sql/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/types"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/mon"
)
Expand Down Expand Up @@ -337,21 +338,8 @@ func (p *planner) constructWindowDefinitions(

// Validate frame of the window definition if present.
if windowDef.Frame != nil {
bounds := windowDef.Frame.Bounds
startBound, endBound := bounds.StartBound, bounds.EndBound
if startBound.OffsetExpr != nil {
typedStartOffsetExpr, err := tree.TypeCheckAndRequire(startBound.OffsetExpr, &p.semaCtx, types.Int, "window frame start")
if err != nil {
return err
}
startBound.OffsetExpr = typedStartOffsetExpr
}
if endBound != nil && endBound.OffsetExpr != nil {
typedEndOffsetExpr, err := tree.TypeCheckAndRequire(endBound.OffsetExpr, &p.semaCtx, types.Int, "window frame end")
if err != nil {
return err
}
endBound.OffsetExpr = typedEndOffsetExpr
if err := windowDef.Frame.TypeCheck(&p.semaCtx, &windowDef); err != nil {
return err
}
}

Expand Down Expand Up @@ -664,29 +652,74 @@ func (n *windowNode) computeWindows(ctx context.Context, evalCtx *tree.EvalConte
// OffsetExpr's must be integer expressions not containing any variables, aggregate functions, or window functions,
// so we need to make sure these expressions are evaluated before using offsets.
bounds := frameRun.Frame.Bounds
startBound, endBound := bounds.StartBound, bounds.EndBound
var requiredType types.T
switch frameRun.Frame.Mode {
case tree.ROWS:
// In ROWS mode, offsets must be non-null, non-negative integers.
// Non-nullity has already been checked, non-negativity is checked below.
requiredType = types.Int
case tree.RANGE:
// In RANGE mode, offsets must be non-null and non-negative.
// Non-nullity has already been checked, non-negativity is checked below.
if startBound.OffsetExpr != nil || (endBound != nil && endBound.OffsetExpr != nil) {
// At least one of the bounds is of type `value PRECEDING` or 'value FOLLOWING'.
// We require ordering on the single column that is the first argument
// to a window function that has this window frame. If the ordering is
// on the different column, we'll catch it later.
if len(windowFn.columnOrdering) != 1 {
return pgerror.NewErrorf(pgerror.CodeWindowingError, "RANGE mode requires that the ORDER BY clause specify exactly one column")
}
// Assumption: renderNode is a parent of windowNode, and its renders
// contain IndexedVars representing the input columns. To make sure
// that ordering is done on the appropriate column, we compare indices
// of IndexedVars.
switch r := n.plan.(type) {
case *renderNode:
argCol, ok := r.render[windowFn.argIdxStart].(*tree.IndexedVar)
if !ok {
panic("unexpected: render of the first window function argument is not an IndexedVar")
}
ordCol, ok := r.render[windowFn.columnOrdering[0].ColIdx].(*tree.IndexedVar)
if !ok {
panic("unexpected: render of the ordering column is not an IndexedVar")
}
if argCol.Idx != ordCol.Idx {
return pgerror.NewErrorf(pgerror.CodeWindowingError, "RANGE mode requires ordering be on the column over which window function calculation takes place")
}
requiredType = argCol.ResolvedType()
if types.IsDateTimeType(requiredType) {
// Spec: for datetime ordering columns, the required type is an 'interval'.
requiredType = types.Interval
}
default:
panic("unexpected: parent of windowNode is not renderNode")
}
}
default:
panic("unexpected WindowFrameMode")
}
if bounds.StartBound.OffsetExpr != nil {
typedStartOffset := bounds.StartBound.OffsetExpr.(tree.TypedExpr)
dStartOffset, err := typedStartOffset.Eval(evalCtx)
if err != nil {
return err
}
startOffset := int(tree.MustBeDInt(dStartOffset))
if startOffset < 0 {
if isNegative(evalCtx, dStartOffset) {
return pgerror.NewErrorf(pgerror.CodeInvalidParameterValueError, "frame starting offset must not be negative")
}
frameRun.StartBoundOffset = startOffset
frameRun.StartBoundOffset = dStartOffset
}
if bounds.EndBound != nil && bounds.EndBound.OffsetExpr != nil {
typedEndOffset := bounds.EndBound.OffsetExpr.(tree.TypedExpr)
dEndOffset, err := typedEndOffset.Eval(evalCtx)
if err != nil {
return err
}
endOffset := int(tree.MustBeDInt(dEndOffset))
if endOffset < 0 {
if isNegative(evalCtx, dEndOffset) {
return pgerror.NewErrorf(pgerror.CodeInvalidParameterValueError, "frame ending offset must not be negative")
}
frameRun.EndBoundOffset = endOffset
frameRun.EndBoundOffset = dEndOffset
}
}

Expand Down Expand Up @@ -845,6 +878,22 @@ func (n *windowNode) computeWindows(ctx context.Context, evalCtx *tree.EvalConte
return nil
}

// isNegative returns whether offset is negative.
func isNegative(evalCtx *tree.EvalContext, offset tree.Datum) bool {
switch o := offset.(type) {
case *tree.DInt:
return *o < 0
case *tree.DDecimal:
return o.Negative
case *tree.DFloat:
return *o < 0
case *tree.DInterval:
return o.Compare(evalCtx, &tree.DInterval{duration.Duration{}}) < 0
default:
panic("unexpected offset type")
}
}

// populateValues populates n.run.values with final datum values after computing
// window result values in n.run.windowValues.
func (n *windowNode) populateValues(ctx context.Context, evalCtx *tree.EvalContext) error {
Expand Down

0 comments on commit 6f93fec

Please sign in to comment.