diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index b84575be73a7..b331bbdbf5c5 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -2017,7 +2017,7 @@ func_name ::= | prefixed_column_path window_specification ::= - '(' opt_existing_window_name opt_partition_clause opt_sort_clause ')' + '(' opt_existing_window_name opt_partition_clause opt_sort_clause opt_frame_clause ')' window_name ::= name @@ -2090,6 +2090,11 @@ opt_partition_clause ::= 'PARTITION' 'BY' expr_list | +opt_frame_clause ::= + 'RANGE' frame_extent + | 'ROWS' frame_extent + | + extract_list ::= extract_arg 'FROM' a_expr | expr_list @@ -2129,6 +2134,10 @@ rowsfrom_item ::= partition_name ::= unrestricted_name +frame_extent ::= + frame_bound + | 'BETWEEN' frame_bound 'AND' frame_bound + extract_arg ::= 'identifier' | 'YEAR' @@ -2146,3 +2155,10 @@ substr_from ::= substr_for ::= 'FOR' a_expr + +frame_bound ::= + 'UNBOUNDED' 'PRECEDING' + | 'UNBOUNDED' 'FOLLOWING' + | 'CURRENT' 'ROW' + | a_expr 'PRECEDING' + | a_expr 'FOLLOWING' diff --git a/pkg/sql/logictest/testdata/logic_test/window b/pkg/sql/logictest/testdata/logic_test/window index e38989451d52..7c73fa1cef67 100644 --- a/pkg/sql/logictest/testdata/logic_test/window +++ b/pkg/sql/logictest/testdata/logic_test/window @@ -282,7 +282,7 @@ SELECT rank() FROM kv query error unknown signature: rank\(int\) SELECT rank(22) FROM kv -query error window function calls cannot be nested under avg\(\) +query error window function calls cannot be nested SELECT avg(avg(k) OVER ()) OVER () FROM kv ORDER BY 1 query error OVER specified, but round\(\) is neither a window function nor an aggregate function @@ -1514,3 +1514,212 @@ SELECT max(i) * (1/j) * (row_number() OVER (ORDER BY max(i))) FROM (SELECT 1 AS # regression test for #23798 until #10495 is fixed. statement error function reserved for internal use SELECT final_variance(1.2, 1.2, 123) OVER (PARTITION BY k) FROM kv + + +statement ok +CREATE TABLE products ( + group_id serial PRIMARY KEY, + group_name VARCHAR (255) NOT NULL, + product_name VARCHAR (255) NOT NULL, + price DECIMAL (11, 2), + priceInt INT, + priceFloat FLOAT +) + +statement ok +INSERT INTO products (group_name, product_name, price, priceInt, priceFloat) VALUES +('Smartphone', 'Microsoft Lumia', 200, 200, 200), +('Smartphone', 'HTC One', 400, 400, 400), +('Smartphone', 'Nexus', 500, 500, 500), +('Smartphone', 'iPhone', 900, 900, 900), +('Laptop', 'HP Elite', 1200, 1200, 1200), +('Laptop', 'Lenovo Thinkpad', 700, 700, 700), +('Laptop', 'Sony VAIO', 700, 700, 700), +('Laptop', 'Dell', 800, 800, 800), +('Tablet', 'iPad', 700, 700, 700), +('Tablet', 'Kindle Fire', 150, 150, 150), +('Tablet', 'Samsung', 200, 200, 200) + +statement error cannot copy window "w" because it has a frame clause +SELECT price, max(price) OVER (w ORDER BY price) AS max_price FROM products WINDOW w AS (PARTITION BY price ROWS UNBOUNDED PRECEDING); + +statement error frame starting offset must not be negative +SELECT price, avg(price) OVER (PARTITION BY price ROWS -1 PRECEDING) AS avg_price FROM products; + +statement error frame ending offset must not be negative +SELECT price, avg(price) OVER (PARTITION BY price ROWS BETWEEN 1 FOLLOWING AND -1 FOLLOWING) AS avg_price FROM products; + +statement error frame ending offset must not be negative +SELECT product_name, price, min(price) OVER (PARTITION BY group_name ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS min_over_three, max(price) OVER (PARTITION BY group_name ROWS BETWEEN UNBOUNDED PRECEDING AND -1 FOLLOWING) AS max_over_partition FROM products ORDER BY group_id; + +statement error incompatible window frame start type: decimal +SELECT avg(price) OVER (PARTITION BY group_name ROWS 1.5 PRECEDING) AS avg_price FROM products; + +statement error incompatible window frame start type: decimal +SELECT avg(price) OVER (PARTITION BY group_name ROWS BETWEEN 1.5 PRECEDING AND UNBOUNDED FOLLOWING) AS avg_price FROM products; + +statement error incompatible window frame end type: decimal +SELECT avg(price) OVER (PARTITION BY group_name ROWS BETWEEN UNBOUNDED PRECEDING AND 1.5 FOLLOWING) AS avg_price FROM products; + +query TRT +SELECT product_name, price, first_value(product_name) OVER w AS first FROM products WHERE price = 200 OR price = 700 WINDOW w as (PARTITION BY price ORDER BY product_name RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) ORDER BY price, product_name; +---- +Microsoft Lumia 200.00 Microsoft Lumia +Samsung 200.00 Microsoft Lumia +Lenovo Thinkpad 700.00 Lenovo Thinkpad +Sony VAIO 700.00 Lenovo Thinkpad +iPad 700.00 Lenovo Thinkpad + +query TRT +SELECT product_name, price, last_value(product_name) OVER w AS last FROM products WHERE price = 200 OR price = 700 WINDOW w as (PARTITION BY price ORDER BY product_name RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) ORDER BY price, product_name; +---- +Microsoft Lumia 200.00 Samsung +Samsung 200.00 Samsung +Lenovo Thinkpad 700.00 iPad +Sony VAIO 700.00 iPad +iPad 700.00 iPad + +query TRT +SELECT product_name, price, nth_value(product_name, 2) OVER w AS second FROM products WHERE price = 200 OR price = 700 WINDOW w as (PARTITION BY price ORDER BY product_name RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) ORDER BY price, product_name; +---- +Microsoft Lumia 200.00 Samsung +Samsung 200.00 NULL +Lenovo Thinkpad 700.00 Sony VAIO +Sony VAIO 700.00 iPad +iPad 700.00 NULL + +query TTRR +SELECT product_name, group_name, price, avg(price) OVER (PARTITION BY group_name ORDER BY price, product_name ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS avg_of_three FROM products ORDER BY group_name, price, product_name; +---- +Lenovo Thinkpad Laptop 700.00 700.00 +Sony VAIO Laptop 700.00 733.33333333333333333 +Dell Laptop 800.00 900.00 +HP Elite Laptop 1200.00 1000.00 +Microsoft Lumia Smartphone 200.00 300.00 +HTC One Smartphone 400.00 366.66666666666666667 +Nexus Smartphone 500.00 600.00 +iPhone Smartphone 900.00 700.00 +Kindle Fire Tablet 150.00 175.00 +Samsung Tablet 200.00 350.00 +iPad Tablet 700.00 450.00 + +query TTRR +SELECT product_name, group_name, price, avg(priceFloat) OVER (PARTITION BY group_name ORDER BY price, product_name ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS avg_of_three_floats FROM products ORDER BY group_name, price, product_name; +---- +Lenovo Thinkpad Laptop 700.00 700 +Sony VAIO Laptop 700.00 733.3333333333334 +Dell Laptop 800.00 900 +HP Elite Laptop 1200.00 1000 +Microsoft Lumia Smartphone 200.00 300 +HTC One Smartphone 400.00 366.6666666666667 +Nexus Smartphone 500.00 600 +iPhone Smartphone 900.00 700 +Kindle Fire Tablet 150.00 175 +Samsung Tablet 200.00 350 +iPad Tablet 700.00 450 + +query TTRR +SELECT product_name, group_name, price, avg(priceInt) OVER (PARTITION BY group_name ORDER BY price, product_name ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS avg_of_three_ints FROM products ORDER BY group_name, price, product_name; +---- +Lenovo Thinkpad Laptop 700.00 700 +Sony VAIO Laptop 700.00 733.33333333333333333 +Dell Laptop 800.00 900 +HP Elite Laptop 1200.00 1000 +Microsoft Lumia Smartphone 200.00 300 +HTC One Smartphone 400.00 366.66666666666666667 +Nexus Smartphone 500.00 600 +iPhone Smartphone 900.00 700 +Kindle Fire Tablet 150.00 175 +Samsung Tablet 200.00 350 +iPad Tablet 700.00 450 + +query TTRR +SELECT group_name, product_name, price, avg(price) OVER (PARTITION BY group_name ROWS (SELECT count(*) FROM PRODUCTS WHERE price = 200) PRECEDING) AS running_avg_of_three FROM products ORDER BY group_id; +---- +Smartphone Microsoft Lumia 200.00 200.00 +Smartphone HTC One 400.00 300.00 +Smartphone Nexus 500.00 366.66666666666666667 +Smartphone iPhone 900.00 600.00 +Laptop HP Elite 1200.00 1200.00 +Laptop Lenovo Thinkpad 700.00 950.00 +Laptop Sony VAIO 700.00 866.66666666666666667 +Laptop Dell 800.00 733.33333333333333333 +Tablet iPad 700.00 700.00 +Tablet Kindle Fire 150.00 425.00 +Tablet Samsung 200.00 350.00 + +query TTRR +SELECT group_name, product_name, price, sum(price) OVER (PARTITION BY group_name ROWS 2 PRECEDING) AS running_sum FROM products ORDER BY group_id; +---- +Smartphone Microsoft Lumia 200.00 200.00 +Smartphone HTC One 400.00 600.00 +Smartphone Nexus 500.00 1100.00 +Smartphone iPhone 900.00 1800.00 +Laptop HP Elite 1200.00 1200.00 +Laptop Lenovo Thinkpad 700.00 1900.00 +Laptop Sony VAIO 700.00 2600.00 +Laptop Dell 800.00 2200.00 +Tablet iPad 700.00 700.00 +Tablet Kindle Fire 150.00 850.00 +Tablet Samsung 200.00 1050.00 + +query TTRT +SELECT group_name, product_name, price, array_agg(price) OVER (PARTITION BY group_name ROWS BETWEEN 1 PRECEDING AND 2 FOLLOWING) AS array_agg_price FROM products ORDER BY group_id; +---- +Smartphone Microsoft Lumia 200.00 {200.00,400.00,500.00} +Smartphone HTC One 400.00 {200.00,400.00,500.00,900.00} +Smartphone Nexus 500.00 {400.00,500.00,900.00} +Smartphone iPhone 900.00 {500.00,900.00} +Laptop HP Elite 1200.00 {1200.00,700.00,700.00} +Laptop Lenovo Thinkpad 700.00 {1200.00,700.00,700.00,800.00} +Laptop Sony VAIO 700.00 {700.00,700.00,800.00} +Laptop Dell 800.00 {700.00,800.00} +Tablet iPad 700.00 {700.00,150.00,200.00} +Tablet Kindle Fire 150.00 {700.00,150.00,200.00} +Tablet Samsung 200.00 {150.00,200.00} + +query TTRR +SELECT group_name, product_name, price, avg(price) OVER (PARTITION BY group_name RANGE UNBOUNDED PRECEDING) AS avg_price FROM products ORDER BY group_id; +---- +Smartphone Microsoft Lumia 200.00 500.00 +Smartphone HTC One 400.00 500.00 +Smartphone Nexus 500.00 500.00 +Smartphone iPhone 900.00 500.00 +Laptop HP Elite 1200.00 850.00 +Laptop Lenovo Thinkpad 700.00 850.00 +Laptop Sony VAIO 700.00 850.00 +Laptop Dell 800.00 850.00 +Tablet iPad 700.00 350.00 +Tablet Kindle Fire 150.00 350.00 +Tablet Samsung 200.00 350.00 + +query TTRT +SELECT group_name, product_name, price, min(price) OVER (PARTITION BY group_name ROWS BETWEEN 1 PRECEDING AND 2 PRECEDING) AS min_over_empty_frame FROM products ORDER BY group_id; +---- +Smartphone Microsoft Lumia 200.00 NULL +Smartphone HTC One 400.00 NULL +Smartphone Nexus 500.00 NULL +Smartphone iPhone 900.00 NULL +Laptop HP Elite 1200.00 NULL +Laptop Lenovo Thinkpad 700.00 NULL +Laptop Sony VAIO 700.00 NULL +Laptop Dell 800.00 NULL +Tablet iPad 700.00 NULL +Tablet Kindle Fire 150.00 NULL +Tablet Samsung 200.00 NULL + + +query TRRR +SELECT product_name, price, min(price) OVER (PARTITION BY group_name ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS min_over_three, max(price) OVER (PARTITION BY group_name ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS max_over_partition FROM products ORDER BY group_id; +---- +Microsoft Lumia 200.00 200.00 900.00 +HTC One 400.00 200.00 900.00 +Nexus 500.00 400.00 900.00 +iPhone 900.00 500.00 900.00 +HP Elite 1200.00 700.00 1200.00 +Lenovo Thinkpad 700.00 700.00 1200.00 +Sony VAIO 700.00 700.00 1200.00 +Dell 800.00 700.00 1200.00 +iPad 700.00 150.00 700.00 +Kindle Fire 150.00 150.00 700.00 +Samsung 200.00 150.00 700.00 diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 441cce990a27..afab2390d2e6 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -756,6 +756,34 @@ func TestParse(t *testing.T) { {`SELECT avg(1) OVER (PARTITION BY b ORDER BY c) FROM t`}, {`SELECT avg(1) OVER (w PARTITION BY b ORDER BY c) FROM t`}, + {`SELECT avg(1) OVER (ROWS UNBOUNDED PRECEDING) FROM t`}, + {`SELECT avg(1) OVER (ROWS 1 PRECEDING) FROM t`}, + {`SELECT avg(1) OVER (ROWS CURRENT ROW) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN CURRENT ROW AND CURRENT ROW) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (RANGE UNBOUNDED PRECEDING) FROM t`}, + {`SELECT avg(1) OVER (RANGE CURRENT ROW) FROM t`}, + {`SELECT avg(1) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM t`}, + {`SELECT avg(1) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (RANGE BETWEEN CURRENT ROW AND CURRENT ROW) FROM t`}, + {`SELECT avg(1) OVER (RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (w ROWS UNBOUNDED PRECEDING) FROM t`}, + {`SELECT avg(1) OVER (PARTITION BY b ROWS UNBOUNDED PRECEDING) FROM t`}, + {`SELECT avg(1) OVER (ORDER BY c ROWS UNBOUNDED PRECEDING) FROM t`}, + {`SELECT avg(1) OVER (PARTITION BY b ORDER BY c ROWS UNBOUNDED PRECEDING) FROM t`}, + {`SELECT avg(1) OVER (w PARTITION BY b ORDER BY c ROWS UNBOUNDED PRECEDING) FROM t`}, + {`SELECT a FROM t UNION SELECT 1 FROM t`}, {`SELECT a FROM t UNION SELECT 1 FROM t UNION SELECT 1 FROM t`}, {`SELECT a FROM t UNION ALL SELECT 1 FROM t`}, @@ -1989,6 +2017,83 @@ SELECT max(a ORDER BY b) FROM ab ^ HINT: See: https://github.com/cockroachdb/cockroach/issues/23620`, }, + { + `SELECT avg(1) OVER (RANGE 1 PRECEDING) FROM t`, + `RANGE PRECEDING is only supported with UNBOUNDED at or near "preceding" +SELECT avg(1) OVER (RANGE 1 PRECEDING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (RANGE BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING) FROM t`, + `RANGE FOLLOWING is only supported with UNBOUNDED at or near "following" +SELECT avg(1) OVER (RANGE BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING) FROM t`, + `RANGE PRECEDING is only supported with UNBOUNDED at or near "preceding" +SELECT avg(1) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) FROM t`, + `RANGE FOLLOWING is only supported with UNBOUNDED at or near "following" +SELECT avg(1) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (ROWS UNBOUNDED FOLLOWING) FROM t`, + `frame start cannot be UNBOUNDED FOLLOWING at or near "following" +SELECT avg(1) OVER (ROWS UNBOUNDED FOLLOWING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (ROWS 1 FOLLOWING) FROM t`, + `frame starting from following row cannot end with current row at or near "following" +SELECT avg(1) OVER (ROWS 1 FOLLOWING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (ROWS BETWEEN UNBOUNDED FOLLOWING AND UNBOUNDED FOLLOWING) FROM t`, + `frame start cannot be UNBOUNDED FOLLOWING at or near "following" +SELECT avg(1) OVER (ROWS BETWEEN UNBOUNDED FOLLOWING AND UNBOUNDED FOLLOWING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED PRECEDING) FROM t`, + `frame end cannot be UNBOUNDED PRECEDING at or near "preceding" +SELECT avg(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED PRECEDING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (ROWS BETWEEN CURRENT ROW AND 1 PRECEDING) FROM t`, + `frame starting from current row cannot have preceding rows at or near "preceding" +SELECT avg(1) OVER (ROWS BETWEEN CURRENT ROW AND 1 PRECEDING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (ROWS BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM t`, + `frame starting from following row cannot have preceding rows at or near "preceding" +SELECT avg(1) OVER (ROWS BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (ROWS BETWEEN 1 FOLLOWING AND CURRENT ROW) FROM t`, + `frame starting from following row cannot have preceding rows at or near "row" +SELECT avg(1) OVER (ROWS BETWEEN 1 FOLLOWING AND CURRENT ROW) FROM t + ^ +`, + }, } for _, d := range testData { _, err := Parse(d.sql) diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 7c1bc6ebb4f4..ee0dc508a2fb 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -315,6 +315,15 @@ func (u *sqlSymUnion) orders() []*tree.Order { func (u *sqlSymUnion) groupBy() tree.GroupBy { return u.val.(tree.GroupBy) } +func (u *sqlSymUnion) windowFrame() *tree.WindowFrame { + return u.val.(*tree.WindowFrame) +} +func (u *sqlSymUnion) windowFrameBounds() tree.WindowFrameBounds { + return u.val.(tree.WindowFrameBounds) +} +func (u *sqlSymUnion) windowFrameBound() *tree.WindowFrameBound { + return u.val.(*tree.WindowFrameBound) +} func (u *sqlSymUnion) distinctOn() tree.DistinctOn { return u.val.(tree.DistinctOn) } @@ -932,7 +941,9 @@ func newNameFromStr(s string) *tree.Name { %type window_clause window_definition_list %type <*tree.WindowDef> window_definition over_clause window_specification %type opt_existing_window_name -%type opt_frame_clause frame_extent frame_bound +%type <*tree.WindowFrame> opt_frame_clause +%type frame_extent +%type <*tree.WindowFrameBound> frame_bound %type <[]tree.ColumnID> opt_tableref_col_list tableref_col_list @@ -7148,6 +7159,7 @@ window_specification: RefName: tree.Name($2), Partitions: $3.exprs(), OrderBy: $4.orderBy(), + Frame: $5.windowFrame(), } } @@ -7182,23 +7194,110 @@ opt_partition_clause: // This is only a subset of the full SQL:2008 frame_clause grammar. We don't // support yet. opt_frame_clause: - RANGE frame_extent { return unimplemented(sqllex, "frame range") } -| ROWS frame_extent { return unimplemented(sqllex, "frame rows") } -| /* EMPTY */ {} + RANGE frame_extent + { + bounds := $2.windowFrameBounds() + startBound := bounds.StartBound + endBound := bounds.EndBound + switch { + case startBound.BoundType == tree.ValuePreceding: + sqllex.Error("RANGE PRECEDING is only supported with UNBOUNDED") + return 1 + case startBound.BoundType == tree.ValueFollowing: + sqllex.Error("RANGE FOLLOWING is only supported with UNBOUNDED") + return 1 + case endBound != nil && endBound.BoundType == tree.ValuePreceding: + sqllex.Error("RANGE PRECEDING is only supported with UNBOUNDED") + return 1 + case endBound != nil && endBound.BoundType == tree.ValueFollowing: + sqllex.Error("RANGE FOLLOWING is only supported with UNBOUNDED") + return 1 + } + $$.val = &tree.WindowFrame{ + Mode: tree.RANGE, + Bounds: bounds, + } + } +| ROWS frame_extent + { + $$.val = &tree.WindowFrame{ + Mode: tree.ROWS, + Bounds: $2.windowFrameBounds(), + } + } +| /* EMPTY */ + { + $$.val = (*tree.WindowFrame)(nil) + } frame_extent: - frame_bound { return unimplemented(sqllex, "frame_extent") } -| BETWEEN frame_bound AND frame_bound { return unimplemented(sqllex, "frame_extent") } + frame_bound + { + startBound := $1.windowFrameBound() + switch { + case startBound.BoundType == tree.UnboundedFollowing: + sqllex.Error("frame start cannot be UNBOUNDED FOLLOWING") + return 1 + case startBound.BoundType == tree.ValueFollowing: + sqllex.Error("frame starting from following row cannot end with current row") + return 1 + } + $$.val = tree.WindowFrameBounds{StartBound: startBound} + } +| BETWEEN frame_bound AND frame_bound + { + startBound := $2.windowFrameBound() + endBound := $4.windowFrameBound() + switch { + case startBound.BoundType == tree.UnboundedFollowing: + sqllex.Error("frame start cannot be UNBOUNDED FOLLOWING") + return 1 + case endBound.BoundType == tree.UnboundedPreceding: + sqllex.Error("frame end cannot be UNBOUNDED PRECEDING") + return 1 + case startBound.BoundType == tree.CurrentRow && endBound.BoundType == tree.ValuePreceding: + sqllex.Error("frame starting from current row cannot have preceding rows") + return 1 + case startBound.BoundType == tree.ValueFollowing && endBound.BoundType == tree.ValuePreceding: + sqllex.Error("frame starting from following row cannot have preceding rows") + return 1 + case startBound.BoundType == tree.ValueFollowing && endBound.BoundType == tree.CurrentRow: + sqllex.Error("frame starting from following row cannot have preceding rows") + return 1 + } + $$.val = tree.WindowFrameBounds{StartBound: startBound, EndBound: endBound} + } // This is used for both frame start and frame end, with output set up on the // assumption it's frame start; the frame_extent productions must reject // invalid cases. frame_bound: - UNBOUNDED PRECEDING { return unimplemented(sqllex, "frame_bound") } -| UNBOUNDED FOLLOWING { return unimplemented(sqllex, "frame_bound") } -| CURRENT ROW { return unimplemented(sqllex, "frame_bound") } -| a_expr PRECEDING { return unimplemented(sqllex, "frame_bound") } -| a_expr FOLLOWING { return unimplemented(sqllex, "frame_bound") } + UNBOUNDED PRECEDING + { + $$.val = &tree.WindowFrameBound{BoundType: tree.UnboundedPreceding} + } +| UNBOUNDED FOLLOWING + { + $$.val = &tree.WindowFrameBound{BoundType: tree.UnboundedFollowing} + } +| CURRENT ROW + { + $$.val = &tree.WindowFrameBound{BoundType: tree.CurrentRow} + } +| a_expr PRECEDING + { + $$.val = &tree.WindowFrameBound{ + OffsetExpr: $1.expr(), + BoundType: tree.ValuePreceding, + } + } +| a_expr FOLLOWING + { + $$.val = &tree.WindowFrameBound{ + OffsetExpr: $1.expr(), + BoundType: tree.ValueFollowing, + } + } // Supporting nonterminals for expressions. diff --git a/pkg/sql/sem/builtins/aggregate_builtins.go b/pkg/sql/sem/builtins/aggregate_builtins.go index 50ed2eb923bf..da914c3f483a 100644 --- a/pkg/sql/sem/builtins/aggregate_builtins.go +++ b/pkg/sql/sem/builtins/aggregate_builtins.go @@ -156,7 +156,7 @@ var aggregates = map[string]builtinDefinition{ ReturnType: tree.FixedReturnType(types.Int), AggregateFunc: newCountRowsAggregate, WindowFunc: func(params []types.T, evalCtx *tree.EvalContext) tree.WindowFunc { - return newAggregateWindow(newCountRowsAggregate(params, evalCtx)) + return newFramableAggregateWindow(newCountRowsAggregate(params, evalCtx)) }, Info: "Calculates the number of rows.", }, @@ -318,7 +318,7 @@ func makeAggOverloadWithReturnType( ReturnType: retType, AggregateFunc: f, WindowFunc: func(params []types.T, evalCtx *tree.EvalContext) tree.WindowFunc { - return newAggregateWindow(f(params, evalCtx)) + return newFramableAggregateWindow(f(params, evalCtx)) }, Info: info, } diff --git a/pkg/sql/sem/builtins/window_builtins.go b/pkg/sql/sem/builtins/window_builtins.go index f19500b89ff6..76cf70408af1 100644 --- a/pkg/sql/sem/builtins/window_builtins.go +++ b/pkg/sql/sem/builtins/window_builtins.go @@ -125,6 +125,7 @@ func makeWindowOverload( } var _ tree.WindowFunc = &aggregateWindowFunc{} +var _ tree.WindowFunc = &framableAggregateWindowFunc{} var _ tree.WindowFunc = &rowNumberWindow{} var _ tree.WindowFunc = &rankWindow{} var _ tree.WindowFunc = &denseRankWindow{} @@ -143,21 +144,17 @@ type aggregateWindowFunc struct { peerRes tree.Datum } -func newAggregateWindow(agg tree.AggregateFunc) tree.WindowFunc { - return &aggregateWindowFunc{agg: agg} -} - func (w *aggregateWindowFunc) Compute( - ctx context.Context, evalCtx *tree.EvalContext, wf tree.WindowFrame, + ctx context.Context, evalCtx *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { - if !wf.FirstInPeerGroup() { + if !wfr.FirstInPeerGroup() { return w.peerRes, nil } // Accumulate all values in the peer group at the same time, as these // must return the same value. - for i := 0; i < wf.PeerRowCount; i++ { - args := wf.ArgsWithRowOffset(i) + for i := 0; i < wfr.PeerRowCount; i++ { + args := wfr.ArgsWithRowOffset(i) var value tree.Datum // COUNT_ROWS takes no arguments. if len(args) > 0 { @@ -169,7 +166,6 @@ func (w *aggregateWindowFunc) Compute( } // Retrieve the value for the entire peer group, save it, and return it. - peerRes, err := w.agg.Result() if err != nil { return nil, err @@ -182,6 +178,71 @@ func (w *aggregateWindowFunc) Close(ctx context.Context, evalCtx *tree.EvalConte w.agg.Close(ctx) } +// framableAggregateWindowFunc is a wrapper around aggregateWindowFunc that allows +// to reset the aggregate by creating a new instance via a provided constructor. +type framableAggregateWindowFunc struct { + agg *aggregateWindowFunc + aggConstructor func(*tree.EvalContext) tree.AggregateFunc +} + +func newFramableAggregateWindow(agg tree.AggregateFunc) tree.WindowFunc { + return &framableAggregateWindowFunc{agg: &aggregateWindowFunc{agg: agg}} +} + +func (w *framableAggregateWindowFunc) Compute( + ctx context.Context, evalCtx *tree.EvalContext, wfr *tree.WindowFrameRun, +) (tree.Datum, error) { + if !wfr.FirstInPeerGroup() { + return w.agg.peerRes, nil + } + if w.aggConstructor == nil { + // No constructor is given, so we use default approach. + return w.agg.Compute(ctx, evalCtx, wfr) + } + + // When aggConstructor is provided, we want to dispose of the old aggregate function + // and construct a new one for the computation. + w.agg.Close(ctx, evalCtx) + *w.agg = aggregateWindowFunc{w.aggConstructor(evalCtx), tree.DNull} + + // Accumulate all values in the window frame. + for i := wfr.FrameStartIdx(); i < wfr.FrameEndIdx(); i++ { + args := wfr.ArgsByRowIdx(i) + var value tree.Datum + // COUNT_ROWS takes no arguments. + if len(args) > 0 { + value = args[0] + } + if err := w.agg.agg.Add(ctx, value); err != nil { + return nil, err + } + } + + // Retrieve the value for the entire peer group, save it, and return it. + peerRes, err := w.agg.agg.Result() + if err != nil { + return nil, err + } + w.agg.peerRes = peerRes + return w.agg.peerRes, nil +} + +func (w *framableAggregateWindowFunc) Close(ctx context.Context, evalCtx *tree.EvalContext) { + w.agg.Close(ctx, evalCtx) +} + +// AddAggregateConstructorToFramableAggregate adds provided constructor to framableAggregateWindowFunc +// so that aggregates can be 'reset' when computing values over a window frame. +func AddAggregateConstructorToFramableAggregate( + windowFunc tree.WindowFunc, aggConstructor func(*tree.EvalContext) tree.AggregateFunc, +) { + // We only want to add aggConstructor to framableAggregateWindowFunc's since + // all non-aggregates builtins specific to window functions support framing "natively". + if framableAgg, ok := windowFunc.(*framableAggregateWindowFunc); ok { + framableAgg.aggConstructor = aggConstructor + } +} + // rowNumberWindow computes the number of the current row within its partition, // counting from 1. type rowNumberWindow struct{} @@ -191,9 +252,9 @@ func newRowNumberWindow([]types.T, *tree.EvalContext) tree.WindowFunc { } func (rowNumberWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { - return tree.NewDInt(tree.DInt(wf.RowIdx + 1 /* one-indexed */)), nil + return tree.NewDInt(tree.DInt(wfr.RowIdx + 1 /* one-indexed */)), nil } func (rowNumberWindow) Close(context.Context, *tree.EvalContext) {} @@ -208,10 +269,10 @@ func newRankWindow([]types.T, *tree.EvalContext) tree.WindowFunc { } func (w *rankWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { - if wf.FirstInPeerGroup() { - w.peerRes = tree.NewDInt(tree.DInt(wf.Rank())) + if wfr.FirstInPeerGroup() { + w.peerRes = tree.NewDInt(tree.DInt(wfr.Rank())) } return w.peerRes, nil } @@ -229,9 +290,9 @@ func newDenseRankWindow([]types.T, *tree.EvalContext) tree.WindowFunc { } func (w *denseRankWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { - if wf.FirstInPeerGroup() { + if wfr.FirstInPeerGroup() { w.denseRank++ w.peerRes = tree.NewDInt(tree.DInt(w.denseRank)) } @@ -253,16 +314,16 @@ func newPercentRankWindow([]types.T, *tree.EvalContext) tree.WindowFunc { var dfloatZero = tree.NewDFloat(0) func (w *percentRankWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { // Return zero if there's only one row, per spec. - if wf.RowCount() <= 1 { + if wfr.PartitionSize() <= 1 { return dfloatZero, nil } - if wf.FirstInPeerGroup() { + if wfr.FirstInPeerGroup() { // (rank - 1) / (total rows - 1) - w.peerRes = tree.NewDFloat(tree.DFloat(wf.Rank()-1) / tree.DFloat(wf.RowCount()-1)) + w.peerRes = tree.NewDFloat(tree.DFloat(wfr.Rank()-1) / tree.DFloat(wfr.PartitionSize()-1)) } return w.peerRes, nil } @@ -280,11 +341,11 @@ func newCumulativeDistWindow([]types.T, *tree.EvalContext) tree.WindowFunc { } func (w *cumulativeDistWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { - if wf.FirstInPeerGroup() { + if wfr.FirstInPeerGroup() { // (number of rows preceding or peer with current row) / (total rows) - w.peerRes = tree.NewDFloat(tree.DFloat(wf.FrameSize()) / tree.DFloat(wf.RowCount())) + w.peerRes = tree.NewDFloat(tree.DFloat(wfr.DefaultFrameSize()) / tree.DFloat(wfr.PartitionSize())) } return w.peerRes, nil } @@ -308,13 +369,13 @@ var errInvalidArgumentForNtile = pgerror.NewErrorf( pgerror.CodeInvalidParameterValueError, "argument of ntile() must be greater than zero") func (w *ntileWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { if w.ntile == nil { // If this is the first call to ntileWindow.Compute, set up the buckets. - total := wf.RowCount() + total := wfr.PartitionSize() - arg := wf.Args()[0] + arg := wfr.Args()[0] if arg == tree.DNull { // per spec: If argument is the null value, then the result is the null value. return tree.DNull, nil @@ -378,11 +439,11 @@ func makeLeadLagWindowConstructor( } func (w *leadLagWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { offset := 1 if w.withOffset { - offsetArg := wf.Args()[1] + offsetArg := wfr.Args()[1] if offsetArg == tree.DNull { return tree.DNull, nil } @@ -392,16 +453,16 @@ func (w *leadLagWindow) Compute( offset *= -1 } - if targetRow := wf.RowIdx + offset; targetRow < 0 || targetRow >= wf.RowCount() { + if targetRow := wfr.RowIdx + offset; targetRow < 0 || targetRow >= wfr.PartitionSize() { // Target row is out of the partition; supply default value if provided, // otherwise return NULL. if w.withDefault { - return wf.Args()[2], nil + return wfr.Args()[2], nil } return tree.DNull, nil } - return wf.ArgsWithRowOffset(offset)[0], nil + return wfr.ArgsWithRowOffset(offset)[0], nil } func (w *leadLagWindow) Close(context.Context, *tree.EvalContext) {} @@ -414,9 +475,9 @@ func newFirstValueWindow([]types.T, *tree.EvalContext) tree.WindowFunc { } func (firstValueWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { - return wf.Rows[0].Row[wf.ArgIdxStart], nil + return wfr.Rows[wfr.FrameStartIdx()].Row[wfr.ArgIdxStart], nil } func (firstValueWindow) Close(context.Context, *tree.EvalContext) {} @@ -429,9 +490,9 @@ func newLastValueWindow([]types.T, *tree.EvalContext) tree.WindowFunc { } func (lastValueWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { - return wf.Rows[wf.FrameSize()-1].Row[wf.ArgIdxStart], nil + return wfr.Rows[wfr.FrameEndIdx()-1].Row[wfr.ArgIdxStart], nil } func (lastValueWindow) Close(context.Context, *tree.EvalContext) {} @@ -448,9 +509,9 @@ var errInvalidArgumentForNthValue = pgerror.NewErrorf( pgerror.CodeInvalidParameterValueError, "argument of nth_value() must be greater than zero") func (nthValueWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { - arg := wf.Args()[1] + arg := wfr.Args()[1] if arg == tree.DNull { return tree.DNull, nil } @@ -460,12 +521,10 @@ func (nthValueWindow) Compute( return nil, errInvalidArgumentForNthValue } - // per spec: Only consider the rows within the "window frame", which by default contains - // the rows from the start of the partition through the last peer of the current row. - if nth > wf.FrameSize() { + if nth > wfr.FrameSize() { return tree.DNull, nil } - return wf.Rows[nth-1].Row[wf.ArgIdxStart], nil + return wfr.Rows[wfr.FrameStartIdx()+nth-1].Row[wfr.ArgIdxStart], nil } func (nthValueWindow) Close(context.Context, *tree.EvalContext) {} diff --git a/pkg/sql/sem/tree/select.go b/pkg/sql/sem/tree/select.go index 53ae2f8a313d..3201eaeffac1 100644 --- a/pkg/sql/sem/tree/select.go +++ b/pkg/sql/sem/tree/select.go @@ -541,6 +541,18 @@ func (node *Limit) Format(ctx *FmtCtx) { } } +// RowsFromExpr represents a ROWS FROM(...) expression. +type RowsFromExpr struct { + Items Exprs +} + +// Format implements the NodeFormatter interface. +func (node *RowsFromExpr) Format(ctx *FmtCtx) { + ctx.WriteString("ROWS FROM (") + ctx.FormatNode(&node.Items) + ctx.WriteByte(')') +} + // Window represents a WINDOW clause. type Window []*WindowDef @@ -562,6 +574,7 @@ type WindowDef struct { RefName Name Partitions Exprs OrderBy OrderBy + Frame *WindowFrame } // Format implements the NodeFormatter interface. @@ -590,21 +603,108 @@ func (node *WindowDef) Format(ctx *FmtCtx) { ctx.WriteString(orderByStr[1:]) } needSpaceSeparator = true - _ = needSpaceSeparator // avoid compiler warning until TODO below is addressed. } - // TODO(nvanbenschoten): Support Window Frames. - // if node.Frame != nil {} - ctx.WriteByte(')') + if node.Frame != nil { + if needSpaceSeparator { + ctx.WriteRune(' ') + } + ctx.FormatNode(node.Frame) + } + ctx.WriteRune(')') } -// RowsFromExpr represents a ROWS FROM(...) expression. -type RowsFromExpr struct { - Items Exprs +// WindowFrameMode indicates which mode of framing is used. +type WindowFrameMode int + +const ( + // RANGE is the mode of specifying frame in terms of logical range (e.g. 100 units cheaper). + RANGE WindowFrameMode = iota + // ROWS is the mode of specifying frame in terms of physical offsets (e.g. 1 row before etc). + ROWS +) + +// WindowFrameBoundType indicates which type of boundary is used. +type WindowFrameBoundType int + +const ( + // UnboundedPreceding represents UNBOUNDED PRECEDING type of boundary. + UnboundedPreceding WindowFrameBoundType = iota + // ValuePreceding represents 'value' PRECEDING type of boundary. + ValuePreceding + // CurrentRow represents CURRENT ROW type of boundary. + CurrentRow + // ValueFollowing represents 'value' FOLLOWING type of boundary. + ValueFollowing + // UnboundedFollowing represents UNBOUNDED FOLLOWING type of boundary. + UnboundedFollowing +) + +// WindowFrameBound specifies the offset and the type of boundary. +type WindowFrameBound struct { + BoundType WindowFrameBoundType + OffsetExpr Expr +} + +// WindowFrameBounds specifies boundaries of the window frame. +// The row at StartBound is included whereas the row at EndBound is not. +type WindowFrameBounds struct { + StartBound *WindowFrameBound + EndBound *WindowFrameBound +} + +// WindowFrame represents static state of window frame over which calculations are made. +type WindowFrame struct { + Mode WindowFrameMode // the mode of framing being used + Bounds WindowFrameBounds // the bounds of the frame +} + +func (boundary *WindowFrameBound) write(ctx *FmtCtx) { + switch boundary.BoundType { + case UnboundedPreceding: + ctx.WriteString("UNBOUNDED PRECEDING") + case ValuePreceding: + ctx.FormatNode(boundary.OffsetExpr) + ctx.WriteString(" PRECEDING") + case CurrentRow: + ctx.WriteString("CURRENT ROW") + case ValueFollowing: + ctx.FormatNode(boundary.OffsetExpr) + ctx.WriteString(" FOLLOWING") + case UnboundedFollowing: + ctx.WriteString("UNBOUNDED FOLLOWING") + default: + panic("unexpected WindowFrameBoundType") + } } // Format implements the NodeFormatter interface. -func (node *RowsFromExpr) Format(ctx *FmtCtx) { - ctx.WriteString("ROWS FROM (") - ctx.FormatNode(&node.Items) - ctx.WriteByte(')') +func (wf *WindowFrame) Format(ctx *FmtCtx) { + switch wf.Mode { + case RANGE: + ctx.WriteString("RANGE ") + case ROWS: + ctx.WriteString("ROWS ") + default: + panic("unexpected WindowFrameMode") + } + if wf.Bounds.EndBound != nil { + ctx.WriteString("BETWEEN ") + wf.Bounds.StartBound.write(ctx) + ctx.WriteString(" AND ") + wf.Bounds.EndBound.write(ctx) + } else { + wf.Bounds.StartBound.write(ctx) + } +} + +// Copy returns a deep copy of wf. +func (wf *WindowFrame) Copy() *WindowFrame { + frameCopy := &WindowFrame{Mode: wf.Mode} + startBoundCopy := *wf.Bounds.StartBound + frameCopy.Bounds = WindowFrameBounds{&startBoundCopy, nil} + if wf.Bounds.EndBound != nil { + endBoundCopy := *wf.Bounds.EndBound + frameCopy.Bounds.EndBound = &endBoundCopy + } + return frameCopy } diff --git a/pkg/sql/sem/tree/type_check.go b/pkg/sql/sem/tree/type_check.go index 97362beb8cec..a5cb82df781c 100644 --- a/pkg/sql/sem/tree/type_check.go +++ b/pkg/sql/sem/tree/type_check.go @@ -758,6 +758,24 @@ func (expr *FuncExpr) TypeCheck(ctx *SemaContext, desired types.T) (TypedExpr, e } expr.WindowDef.OrderBy[i].Expr = typedOrderBy } + if expr.WindowDef.Frame != nil { + bounds := expr.WindowDef.Frame.Bounds + startBound, endBound := bounds.StartBound, bounds.EndBound + if startBound.OffsetExpr != nil { + typedStartOffsetExpr, err := typeCheckAndRequire(ctx, startBound.OffsetExpr, types.Int, "window frame start") + if err != nil { + return nil, err + } + startBound.OffsetExpr = typedStartOffsetExpr + } + if endBound != nil && endBound.OffsetExpr != nil { + typedEndOffsetExpr, err := typeCheckAndRequire(ctx, endBound.OffsetExpr, types.Int, "window frame end") + if err != nil { + return nil, err + } + endBound.OffsetExpr = typedEndOffsetExpr + } + } } else { // Make sure the window function builtins are used as window function applications. if def.Class == WindowClass { diff --git a/pkg/sql/sem/tree/walk.go b/pkg/sql/sem/tree/walk.go index 2fe659febe6d..d4c9dc29d25f 100644 --- a/pkg/sql/sem/tree/walk.go +++ b/pkg/sql/sem/tree/walk.go @@ -222,6 +222,9 @@ func (expr *FuncExpr) CopyNode() *FuncExpr { } windowDef.OrderBy = newOrderBy } + if windowDef.Frame != nil { + windowDef.Frame = windowDef.Frame.Copy() + } } return &exprCopy } @@ -260,6 +263,28 @@ func (expr *FuncExpr) Walk(v Visitor) Expr { ret.WindowDef.OrderBy[i].Expr = e } } + if expr.WindowDef.Frame != nil { + startBound, endBound := expr.WindowDef.Frame.Bounds.StartBound, expr.WindowDef.Frame.Bounds.EndBound + if startBound.OffsetExpr != nil { + e, changed := WalkExpr(v, startBound.OffsetExpr) + if changed { + if ret == expr { + ret = expr.CopyNode() + } + ret.WindowDef.Frame.Bounds.StartBound.OffsetExpr = e + } + } + if endBound != nil && endBound.OffsetExpr != nil { + e, changed := WalkExpr(v, endBound.OffsetExpr) + if changed { + if ret == expr { + ret = expr.CopyNode() + } + ret.WindowDef.Frame.Bounds.EndBound.OffsetExpr = e + } + } + } + } if expr.Filter != nil { e, changed := WalkExpr(v, expr.Filter) diff --git a/pkg/sql/sem/tree/window_funs.go b/pkg/sql/sem/tree/window_funs.go index bc7a89d885ca..e9ec8eba1f03 100644 --- a/pkg/sql/sem/tree/window_funs.go +++ b/pkg/sql/sem/tree/window_funs.go @@ -14,7 +14,9 @@ package tree -import "context" +import ( + "context" +) // IndexedRow is a row with a corresponding index. type IndexedRow struct { @@ -22,12 +24,15 @@ type IndexedRow struct { Row Datums } -// WindowFrame is a view into a subset of data over which calculations are made. -type WindowFrame struct { +// WindowFrameRun contains the runtime state of window frame during calculations. +type WindowFrameRun struct { // constant for all calls to WindowFunc.Add - Rows []IndexedRow - ArgIdxStart int // the index which arguments to the window function begin - ArgCount int // the number of window function arguments + Rows []IndexedRow + ArgIdxStart int // the index which arguments to the window function begin + ArgCount int // the number of window function arguments + Frame *WindowFrame // If non-nil, Frame represents the frame specification of this window. If nil, default frame is used. + StartBoundOffset int + EndBoundOffset int // changes for each row (each call to WindowFunc.Add) RowIdx int // the current row index @@ -37,39 +42,165 @@ type WindowFrame struct { PeerRowCount int // the number of rows in the current peer group } -// Rank returns the rank of this frame. -func (wf WindowFrame) Rank() int { - return wf.RowIdx + 1 +// FrameStartIdx returns the index of starting row in the frame (which is the first to be included). +func (wfr WindowFrameRun) FrameStartIdx() int { + if wfr.Frame == nil { + return 0 + } + switch wfr.Frame.Mode { + case RANGE: + switch wfr.Frame.Bounds.StartBound.BoundType { + case UnboundedPreceding: + return 0 + case ValuePreceding: + // TODO(yuzefovich): Currently, it is not supported, and this case should not be reached. + panic("unsupported WindowFrameBoundType in RANGE mode") + case CurrentRow: + // Spec: in RANGE mode CURRENT ROW means that the frame starts with the current row's first peer. + return wfr.FirstPeerIdx + case ValueFollowing: + // TODO(yuzefovich): Currently, it is not supported, and this case should not be reached. + panic("unsupported WindowFrameBoundType in RANGE mode") + default: + panic("unexpected WindowFrameBoundType in RANGE mode") + } + case ROWS: + switch wfr.Frame.Bounds.StartBound.BoundType { + case UnboundedPreceding: + return 0 + case ValuePreceding: + idx := wfr.RowIdx - wfr.StartBoundOffset + if idx < 0 { + idx = 0 + } + return idx + case CurrentRow: + return wfr.RowIdx + case ValueFollowing: + idx := wfr.RowIdx + wfr.StartBoundOffset + if idx >= wfr.PartitionSize() { + idx = wfr.unboundedFollowing() + } + return idx + default: + panic("unexpected WindowFrameBoundType in ROWS mode") + } + default: + panic("unexpected WindowFrameMode") + } } -// RowCount returns the number of rows in this frame. -func (wf WindowFrame) RowCount() int { - return len(wf.Rows) +// FrameEndIdx returns the index of the first row after the frame. +func (wfr WindowFrameRun) FrameEndIdx() int { + if wfr.Frame == nil { + return wfr.DefaultFrameSize() + } + switch wfr.Frame.Mode { + case RANGE: + if wfr.Frame.Bounds.EndBound == nil { + // We're using default value of CURRENT ROW when EndBound is omitted. + // Spec: in RANGE mode CURRENT ROW means that the frame ends with the current row's last peer. + return wfr.DefaultFrameSize() + } + switch wfr.Frame.Bounds.EndBound.BoundType { + case ValuePreceding: + // TODO(yuzefovich): Currently, it is not supported, and this case should not be reached. + panic("unsupported WindowFrameBoundType in RANGE mode") + case CurrentRow: + return wfr.DefaultFrameSize() + case ValueFollowing: + // TODO(yuzefovich): Currently, it is not supported, and this case should not be reached. + panic("unsupported WindowFrameBoundType in RANGE mode") + case UnboundedFollowing: + return wfr.unboundedFollowing() + default: + panic("unexpected WindowFrameBoundType in RANGE mode") + } + case ROWS: + if wfr.Frame.Bounds.EndBound == nil { + // We're using default value of CURRENT ROW when EndBound is omitted. + return wfr.RowIdx + 1 + } + switch wfr.Frame.Bounds.EndBound.BoundType { + case ValuePreceding: + idx := wfr.RowIdx - wfr.EndBoundOffset + 1 + if idx < 0 { + idx = 0 + } + return idx + case CurrentRow: + return wfr.RowIdx + 1 + case ValueFollowing: + idx := wfr.RowIdx + wfr.EndBoundOffset + 1 + if idx >= wfr.PartitionSize() { + idx = wfr.unboundedFollowing() + } + return idx + case UnboundedFollowing: + return wfr.unboundedFollowing() + default: + panic("unexpected WindowFrameBoundType in ROWS mode") + } + default: + panic("unexpected WindowFrameMode") + } } -// FrameSize returns the size of this frame. -// TODO(nvanbenschoten): This definition only holds while we don't support -// frame specification (RANGE or ROWS) in the OVER clause. -func (wf WindowFrame) FrameSize() int { - return wf.FirstPeerIdx + wf.PeerRowCount +// FrameSize returns the number of rows in the current frame. +func (wfr WindowFrameRun) FrameSize() int { + if wfr.Frame == nil { + return wfr.DefaultFrameSize() + } + size := wfr.FrameEndIdx() - wfr.FrameStartIdx() + if size <= 0 { + size = 0 + } + return size +} + +// Rank returns the rank of the current row. +func (wfr WindowFrameRun) Rank() int { + return wfr.RowIdx + 1 +} + +// PartitionSize returns the number of rows in the current partition. +func (wfr WindowFrameRun) PartitionSize() int { + return len(wfr.Rows) +} + +// unboundedFollowing returns the index of the "first row beyond" the partition +// so that current frame contains all the rows till the end of the partition. +func (wfr WindowFrameRun) unboundedFollowing() int { + return wfr.PartitionSize() +} + +// DefaultFrameSize returns the size of default window frame which contains +// the rows from the start of the partition through the last peer of the current row. +func (wfr WindowFrameRun) DefaultFrameSize() int { + return wfr.FirstPeerIdx + wfr.PeerRowCount } // FirstInPeerGroup returns if the current row is the first in its peer group. -func (wf WindowFrame) FirstInPeerGroup() bool { - return wf.RowIdx == wf.FirstPeerIdx +func (wfr WindowFrameRun) FirstInPeerGroup() bool { + return wfr.RowIdx == wfr.FirstPeerIdx } // Args returns the current argument set in the window frame. -func (wf WindowFrame) Args() Datums { - return wf.ArgsWithRowOffset(0) +func (wfr WindowFrameRun) Args() Datums { + return wfr.ArgsWithRowOffset(0) +} + +// ArgsWithRowOffset returns the argument set at the given offset in the window frame. +func (wfr WindowFrameRun) ArgsWithRowOffset(offset int) Datums { + return wfr.Rows[wfr.RowIdx+offset].Row[wfr.ArgIdxStart : wfr.ArgIdxStart+wfr.ArgCount] } -// ArgsWithRowOffset returns the argumnent set at the given offset in the window frame. -func (wf WindowFrame) ArgsWithRowOffset(offset int) Datums { - return wf.Rows[wf.RowIdx+offset].Row[wf.ArgIdxStart : wf.ArgIdxStart+wf.ArgCount] +// ArgsByRowIdx returns the argument set of the row at idx. +func (wfr WindowFrameRun) ArgsByRowIdx(idx int) Datums { + return wfr.Rows[idx].Row[wfr.ArgIdxStart : wfr.ArgIdxStart+wfr.ArgCount] } -// WindowFunc performs a computation on each row using data from a provided WindowFrame. +// WindowFunc performs a computation on each row using data from a provided WindowFrameRun. type WindowFunc interface { // Compute computes the window function for the provided window frame, given the // current state of WindowFunc. The method should be called sequentially for every @@ -77,7 +208,7 @@ type WindowFunc interface { // because there is an implicit carried dependency between each row and all those // that have come before it (like in an AggregateFunc). As such, this approach does // not present any exploitable associativity/commutativity for optimization. - Compute(context.Context, *EvalContext, WindowFrame) (Datum, error) + Compute(context.Context, *EvalContext, *WindowFrameRun) (Datum, error) // Close allows the window function to free any memory it requested during execution, // such as during the execution of an aggregation like CONCAT_AGG or ARRAY_AGG. diff --git a/pkg/sql/window.go b/pkg/sql/window.go index 79cd1dbaae94..27d303c23b28 100644 --- a/pkg/sql/window.go +++ b/pkg/sql/window.go @@ -20,8 +20,8 @@ import ( "sort" "unsafe" - "github.com/pkg/errors" - + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/sql/sem/transform" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sem/types" @@ -152,12 +152,14 @@ type windowRun struct { windowValues [][]tree.Datum curRowIdx int + windowFrames []*tree.WindowFrame windowsAcc mon.BoundAccount } func (n *windowNode) startExec(params runParams) error { n.run.windowsAcc = params.EvalContext().Mon.MakeBoundAccount() + return nil } @@ -271,13 +273,14 @@ func (p *planner) constructWindowDefinitions( for _, windowDef := range sc.Window { name := string(windowDef.Name) if _, ok := namedWindowSpecs[name]; ok { - return errors.Errorf("window %q is already defined", name) + return pgerror.NewErrorf(pgerror.CodeWindowingError, "window %q is already defined", name) } namedWindowSpecs[name] = windowDef } + n.run.windowFrames = make([]*tree.WindowFrame, len(n.funcs)) // Construct window definitions for each window function application. - for _, windowFn := range n.funcs { + for idx, windowFn := range n.funcs { windowDef, err := constructWindowDef(*windowFn.expr.WindowDef, namedWindowSpecs) if err != nil { return err @@ -320,7 +323,7 @@ func (p *planner) constructWindowDefinitions( } } - windowFn.windowDef = windowDef + n.run.windowFrames[idx] = windowDef.Frame } return nil } @@ -352,7 +355,7 @@ func constructWindowDef( referencedSpec, ok := namedWindowSpecs[refName] if !ok { - return def, errors.Errorf("window %q does not exist", refName) + return def, pgerror.NewErrorf(pgerror.CodeUndefinedObjectError, "window %q does not exist", refName) } if !modifyRef { return *referencedSpec, nil @@ -360,17 +363,23 @@ func constructWindowDef( // referencedSpec.Partitions is always used. if len(def.Partitions) > 0 { - return def, errors.Errorf("cannot override PARTITION BY clause of window %q", refName) + return def, pgerror.NewErrorf(pgerror.CodeWindowingError, "cannot override PARTITION BY clause of window %q", refName) } def.Partitions = referencedSpec.Partitions // referencedSpec.OrderBy is used if set. if len(referencedSpec.OrderBy) > 0 { if len(def.OrderBy) > 0 { - return def, errors.Errorf("cannot override ORDER BY clause of window %q", refName) + return def, pgerror.NewErrorf(pgerror.CodeWindowingError, "cannot override ORDER BY clause of window %q", refName) } def.OrderBy = referencedSpec.OrderBy } + + if referencedSpec.Frame != nil { + return def, pgerror.NewErrorf(pgerror.CodeWindowingError, "cannot copy window %q because it has a frame clause", refName) + } + // TODO(yuzefovich): check the logic above, maybe we need to do or to check something else. + return def, nil } @@ -454,7 +463,7 @@ func (n *windowNode) replaceIndexVarsAndAggFuncs(s *renderNode) { } // The number of aggregation functions that need to be replaced with IndexedVars // is unknown, so we collect them here and bind them to an IndexedVarHelper later. - // We use a map indexed by render index to leverage addOrMergeRender's deduplication + // We use a map indexed by render index to leverage addOrReuseRender's deduplication // of identical aggregate functions. aggIVars := make(map[int]*tree.IndexedVar) @@ -564,6 +573,11 @@ type allPeers struct{} // allPeers implements the peerGroupChecker interface. func (allPeers) InSameGroup(i, j int) bool { return true } +type noPeers struct{} + +// noPeers implements the peerGroupChecker interface. +func (noPeers) InSameGroup(i, j int) bool { return false } + // peerGroupChecker can check if a pair of row indexes within a partition are // in the same peer group. type peerGroupChecker interface { @@ -609,6 +623,38 @@ func (n *windowNode) computeWindows(ctx context.Context, evalCtx *tree.EvalConte var scratchBytes []byte var scratchDatum []tree.Datum for windowIdx, windowFn := range n.funcs { + frameRun := &tree.WindowFrameRun{} + if n.run.windowFrames[windowIdx] != nil { + frameRun.Frame = n.run.windowFrames[windowIdx] + // OffsetExpr's must be integer expressions not containing any variables, aggregate functions, or window functions, + // so we need to make sure these expressions are evaluated before using offsets. + bounds := frameRun.Frame.Bounds + if bounds.StartBound.OffsetExpr != nil { + typedStartOffset := bounds.StartBound.OffsetExpr.(tree.TypedExpr) + dStartOffset, err := typedStartOffset.Eval(evalCtx) + if err != nil { + return err + } + startOffset := int(tree.MustBeDInt(dStartOffset)) + if startOffset < 0 { + return pgerror.NewErrorf(pgerror.CodeInvalidParameterValueError, "frame starting offset must not be negative") + } + frameRun.StartBoundOffset = startOffset + } + if bounds.EndBound != nil && bounds.EndBound.OffsetExpr != nil { + typedEndOffset := bounds.EndBound.OffsetExpr.(tree.TypedExpr) + dEndOffset, err := typedEndOffset.Eval(evalCtx) + if err != nil { + return err + } + endOffset := int(tree.MustBeDInt(dEndOffset)) + if endOffset < 0 { + return pgerror.NewErrorf(pgerror.CodeInvalidParameterValueError, "frame ending offset must not be negative") + } + frameRun.EndBoundOffset = endOffset + } + } + partitions := make(map[string][]tree.IndexedRow) if len(windowFn.partitionIdxs) == 0 { @@ -686,9 +732,10 @@ func (n *windowNode) computeWindows(ctx context.Context, evalCtx *tree.EvalConte builtin := windowFn.expr.GetWindowConstructor()(evalCtx) defer builtin.Close(ctx, evalCtx) - // Since we only support two types of window frames (see TODO above), we only - // need two possible types of peerGroupChecker's to help determine peer groups - // for given tuples. + // In order to calculate aggregates over a particular window frame, + // we need a way to 'reset' the aggregate, so this constructor will be used for that. + aggConstructor := windowFn.expr.GetAggregateConstructor() + var peerGrouper peerGroupChecker if windowFn.columnOrdering != nil { // If an ORDER BY clause is provided, order the partition and use the @@ -707,32 +754,39 @@ func (n *windowNode) computeWindows(ctx context.Context, evalCtx *tree.EvalConte // for functions with syntactically equivalent PARTITION BY and ORDER BY clauses. sort.Sort(sorter) peerGrouper = sorter + } else if frameRun.Frame != nil && frameRun.Frame.Mode == tree.ROWS { + // If ORDER BY clause is not provided and Frame is specified with ROWS mode, + // any row has no peers. + peerGrouper = noPeers{} } else { - // If no ORDER BY clause is provided, all rows in the partition are peers. + // If ORDER BY clause is not provided and either no Frame is provided or Frame is + // specified with RANGE mode, all rows are peers. peerGrouper = allPeers{} } - // Iterate over peer groups within partition using a window frame. - frame := tree.WindowFrame{ - Rows: partition, - ArgIdxStart: windowFn.argIdxStart, - ArgCount: windowFn.argCount, - RowIdx: 0, + frameRun.Rows = partition + frameRun.ArgIdxStart = windowFn.argIdxStart + frameRun.ArgCount = windowFn.argCount + frameRun.RowIdx = 0 + + if frameRun.Frame != nil { + builtins.AddAggregateConstructorToFramableAggregate(builtin, aggConstructor) } - for frame.RowIdx < len(partition) { + + for frameRun.RowIdx < len(partition) { // Compute the size of the current peer group. - frame.FirstPeerIdx = frame.RowIdx - frame.PeerRowCount = 1 - for ; frame.FirstPeerIdx+frame.PeerRowCount < len(partition); frame.PeerRowCount++ { - cur := frame.FirstPeerIdx + frame.PeerRowCount + frameRun.FirstPeerIdx = frameRun.RowIdx + frameRun.PeerRowCount = 1 + for ; frameRun.FirstPeerIdx+frameRun.PeerRowCount < frameRun.PartitionSize(); frameRun.PeerRowCount++ { + cur := frameRun.FirstPeerIdx + frameRun.PeerRowCount if !peerGrouper.InSameGroup(cur, cur-1) { break } } // Perform calculations on each row in the current peer group. - for ; frame.RowIdx < frame.FirstPeerIdx+frame.PeerRowCount; frame.RowIdx++ { - res, err := builtin.Compute(ctx, evalCtx, frame) + for ; frameRun.RowIdx < frameRun.FirstPeerIdx+frameRun.PeerRowCount; frameRun.RowIdx++ { + res, err := builtin.Compute(ctx, evalCtx, frameRun) if err != nil { return err } @@ -744,7 +798,7 @@ func (n *windowNode) computeWindows(ctx context.Context, evalCtx *tree.EvalConte } // Save result into n.run.windowValues, indexed by original row index. - valRowIdx := partition[frame.RowIdx].Idx + valRowIdx := partition[frameRun.RowIdx].Idx n.run.windowValues[valRowIdx][windowIdx] = res } } @@ -843,7 +897,7 @@ func (v *extractWindowFuncsVisitor) VisitPre(expr tree.Expr) (recurse bool, newE // Make sure this window function does not contain another window function. for _, argExpr := range t.Exprs { if v.subWindowVisitor.ContainsWindowFunc(argExpr) { - v.err = fmt.Errorf("window function calls cannot be nested under %s()", &t.Func) + v.err = pgerror.NewErrorf(pgerror.CodeWindowingError, "window function calls cannot be nested") return false, expr } } @@ -913,7 +967,6 @@ type windowFuncHolder struct { argIdxStart int // index of the window function's first arguments in window.wrappedValues argCount int // number of arguments taken by the window function - windowDef tree.WindowDef partitionIdxs []int columnOrdering sqlbase.ColumnOrdering }