From b007aa20d153b81693cee3fd22454c94516d0aeb Mon Sep 17 00:00:00 2001 From: yuzefovich Date: Tue, 12 Jun 2018 20:12:16 -0400 Subject: [PATCH] sql: Add support for specifying window frames for window functions WIP on #26464: ROWS mode is fully supported whereas RANGE works only with UNBOUNDED PRECEDING/CURRENT ROW/UNBOUNDED FOLLOWING boundaries (same as in PostgreSQL). Current implementation of aggregate functions is naive (it simply computes the value of aggregate directly and discards all the previous computations which results in quadratic time). Release note (sql change): CockroachDB now supports custom frame specification for window functions using ROWS (fully-supported) and RANGE ('value' PRECEDING and 'value' FOLLOWING are not supported) modes. --- docs/generated/sql/bnf/stmt_block.bnf | 18 +- pkg/sql/logictest/testdata/logic_test/window | 211 ++++++++++++++++++- pkg/sql/parser/parse_test.go | 105 +++++++++ pkg/sql/parser/sql.y | 121 ++++++++++- pkg/sql/sem/builtins/aggregate_builtins.go | 4 +- pkg/sql/sem/builtins/window_builtins.go | 141 +++++++++---- pkg/sql/sem/tree/select.go | 122 ++++++++++- pkg/sql/sem/tree/type_check.go | 18 ++ pkg/sql/sem/tree/walk.go | 25 +++ pkg/sql/sem/tree/window_funs.go | 183 +++++++++++++--- pkg/sql/window.go | 111 +++++++--- 11 files changed, 937 insertions(+), 122 deletions(-) diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index b84575be73a7..b331bbdbf5c5 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -2017,7 +2017,7 @@ func_name ::= | prefixed_column_path window_specification ::= - '(' opt_existing_window_name opt_partition_clause opt_sort_clause ')' + '(' opt_existing_window_name opt_partition_clause opt_sort_clause opt_frame_clause ')' window_name ::= name @@ -2090,6 +2090,11 @@ opt_partition_clause ::= 'PARTITION' 'BY' expr_list | +opt_frame_clause ::= + 'RANGE' frame_extent + | 'ROWS' frame_extent + | + extract_list ::= extract_arg 'FROM' a_expr | expr_list @@ -2129,6 +2134,10 @@ rowsfrom_item ::= partition_name ::= unrestricted_name +frame_extent ::= + frame_bound + | 'BETWEEN' frame_bound 'AND' frame_bound + extract_arg ::= 'identifier' | 'YEAR' @@ -2146,3 +2155,10 @@ substr_from ::= substr_for ::= 'FOR' a_expr + +frame_bound ::= + 'UNBOUNDED' 'PRECEDING' + | 'UNBOUNDED' 'FOLLOWING' + | 'CURRENT' 'ROW' + | a_expr 'PRECEDING' + | a_expr 'FOLLOWING' diff --git a/pkg/sql/logictest/testdata/logic_test/window b/pkg/sql/logictest/testdata/logic_test/window index e38989451d52..7c73fa1cef67 100644 --- a/pkg/sql/logictest/testdata/logic_test/window +++ b/pkg/sql/logictest/testdata/logic_test/window @@ -282,7 +282,7 @@ SELECT rank() FROM kv query error unknown signature: rank\(int\) SELECT rank(22) FROM kv -query error window function calls cannot be nested under avg\(\) +query error window function calls cannot be nested SELECT avg(avg(k) OVER ()) OVER () FROM kv ORDER BY 1 query error OVER specified, but round\(\) is neither a window function nor an aggregate function @@ -1514,3 +1514,212 @@ SELECT max(i) * (1/j) * (row_number() OVER (ORDER BY max(i))) FROM (SELECT 1 AS # regression test for #23798 until #10495 is fixed. statement error function reserved for internal use SELECT final_variance(1.2, 1.2, 123) OVER (PARTITION BY k) FROM kv + + +statement ok +CREATE TABLE products ( + group_id serial PRIMARY KEY, + group_name VARCHAR (255) NOT NULL, + product_name VARCHAR (255) NOT NULL, + price DECIMAL (11, 2), + priceInt INT, + priceFloat FLOAT +) + +statement ok +INSERT INTO products (group_name, product_name, price, priceInt, priceFloat) VALUES +('Smartphone', 'Microsoft Lumia', 200, 200, 200), +('Smartphone', 'HTC One', 400, 400, 400), +('Smartphone', 'Nexus', 500, 500, 500), +('Smartphone', 'iPhone', 900, 900, 900), +('Laptop', 'HP Elite', 1200, 1200, 1200), +('Laptop', 'Lenovo Thinkpad', 700, 700, 700), +('Laptop', 'Sony VAIO', 700, 700, 700), +('Laptop', 'Dell', 800, 800, 800), +('Tablet', 'iPad', 700, 700, 700), +('Tablet', 'Kindle Fire', 150, 150, 150), +('Tablet', 'Samsung', 200, 200, 200) + +statement error cannot copy window "w" because it has a frame clause +SELECT price, max(price) OVER (w ORDER BY price) AS max_price FROM products WINDOW w AS (PARTITION BY price ROWS UNBOUNDED PRECEDING); + +statement error frame starting offset must not be negative +SELECT price, avg(price) OVER (PARTITION BY price ROWS -1 PRECEDING) AS avg_price FROM products; + +statement error frame ending offset must not be negative +SELECT price, avg(price) OVER (PARTITION BY price ROWS BETWEEN 1 FOLLOWING AND -1 FOLLOWING) AS avg_price FROM products; + +statement error frame ending offset must not be negative +SELECT product_name, price, min(price) OVER (PARTITION BY group_name ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS min_over_three, max(price) OVER (PARTITION BY group_name ROWS BETWEEN UNBOUNDED PRECEDING AND -1 FOLLOWING) AS max_over_partition FROM products ORDER BY group_id; + +statement error incompatible window frame start type: decimal +SELECT avg(price) OVER (PARTITION BY group_name ROWS 1.5 PRECEDING) AS avg_price FROM products; + +statement error incompatible window frame start type: decimal +SELECT avg(price) OVER (PARTITION BY group_name ROWS BETWEEN 1.5 PRECEDING AND UNBOUNDED FOLLOWING) AS avg_price FROM products; + +statement error incompatible window frame end type: decimal +SELECT avg(price) OVER (PARTITION BY group_name ROWS BETWEEN UNBOUNDED PRECEDING AND 1.5 FOLLOWING) AS avg_price FROM products; + +query TRT +SELECT product_name, price, first_value(product_name) OVER w AS first FROM products WHERE price = 200 OR price = 700 WINDOW w as (PARTITION BY price ORDER BY product_name RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) ORDER BY price, product_name; +---- +Microsoft Lumia 200.00 Microsoft Lumia +Samsung 200.00 Microsoft Lumia +Lenovo Thinkpad 700.00 Lenovo Thinkpad +Sony VAIO 700.00 Lenovo Thinkpad +iPad 700.00 Lenovo Thinkpad + +query TRT +SELECT product_name, price, last_value(product_name) OVER w AS last FROM products WHERE price = 200 OR price = 700 WINDOW w as (PARTITION BY price ORDER BY product_name RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) ORDER BY price, product_name; +---- +Microsoft Lumia 200.00 Samsung +Samsung 200.00 Samsung +Lenovo Thinkpad 700.00 iPad +Sony VAIO 700.00 iPad +iPad 700.00 iPad + +query TRT +SELECT product_name, price, nth_value(product_name, 2) OVER w AS second FROM products WHERE price = 200 OR price = 700 WINDOW w as (PARTITION BY price ORDER BY product_name RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) ORDER BY price, product_name; +---- +Microsoft Lumia 200.00 Samsung +Samsung 200.00 NULL +Lenovo Thinkpad 700.00 Sony VAIO +Sony VAIO 700.00 iPad +iPad 700.00 NULL + +query TTRR +SELECT product_name, group_name, price, avg(price) OVER (PARTITION BY group_name ORDER BY price, product_name ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS avg_of_three FROM products ORDER BY group_name, price, product_name; +---- +Lenovo Thinkpad Laptop 700.00 700.00 +Sony VAIO Laptop 700.00 733.33333333333333333 +Dell Laptop 800.00 900.00 +HP Elite Laptop 1200.00 1000.00 +Microsoft Lumia Smartphone 200.00 300.00 +HTC One Smartphone 400.00 366.66666666666666667 +Nexus Smartphone 500.00 600.00 +iPhone Smartphone 900.00 700.00 +Kindle Fire Tablet 150.00 175.00 +Samsung Tablet 200.00 350.00 +iPad Tablet 700.00 450.00 + +query TTRR +SELECT product_name, group_name, price, avg(priceFloat) OVER (PARTITION BY group_name ORDER BY price, product_name ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS avg_of_three_floats FROM products ORDER BY group_name, price, product_name; +---- +Lenovo Thinkpad Laptop 700.00 700 +Sony VAIO Laptop 700.00 733.3333333333334 +Dell Laptop 800.00 900 +HP Elite Laptop 1200.00 1000 +Microsoft Lumia Smartphone 200.00 300 +HTC One Smartphone 400.00 366.6666666666667 +Nexus Smartphone 500.00 600 +iPhone Smartphone 900.00 700 +Kindle Fire Tablet 150.00 175 +Samsung Tablet 200.00 350 +iPad Tablet 700.00 450 + +query TTRR +SELECT product_name, group_name, price, avg(priceInt) OVER (PARTITION BY group_name ORDER BY price, product_name ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS avg_of_three_ints FROM products ORDER BY group_name, price, product_name; +---- +Lenovo Thinkpad Laptop 700.00 700 +Sony VAIO Laptop 700.00 733.33333333333333333 +Dell Laptop 800.00 900 +HP Elite Laptop 1200.00 1000 +Microsoft Lumia Smartphone 200.00 300 +HTC One Smartphone 400.00 366.66666666666666667 +Nexus Smartphone 500.00 600 +iPhone Smartphone 900.00 700 +Kindle Fire Tablet 150.00 175 +Samsung Tablet 200.00 350 +iPad Tablet 700.00 450 + +query TTRR +SELECT group_name, product_name, price, avg(price) OVER (PARTITION BY group_name ROWS (SELECT count(*) FROM PRODUCTS WHERE price = 200) PRECEDING) AS running_avg_of_three FROM products ORDER BY group_id; +---- +Smartphone Microsoft Lumia 200.00 200.00 +Smartphone HTC One 400.00 300.00 +Smartphone Nexus 500.00 366.66666666666666667 +Smartphone iPhone 900.00 600.00 +Laptop HP Elite 1200.00 1200.00 +Laptop Lenovo Thinkpad 700.00 950.00 +Laptop Sony VAIO 700.00 866.66666666666666667 +Laptop Dell 800.00 733.33333333333333333 +Tablet iPad 700.00 700.00 +Tablet Kindle Fire 150.00 425.00 +Tablet Samsung 200.00 350.00 + +query TTRR +SELECT group_name, product_name, price, sum(price) OVER (PARTITION BY group_name ROWS 2 PRECEDING) AS running_sum FROM products ORDER BY group_id; +---- +Smartphone Microsoft Lumia 200.00 200.00 +Smartphone HTC One 400.00 600.00 +Smartphone Nexus 500.00 1100.00 +Smartphone iPhone 900.00 1800.00 +Laptop HP Elite 1200.00 1200.00 +Laptop Lenovo Thinkpad 700.00 1900.00 +Laptop Sony VAIO 700.00 2600.00 +Laptop Dell 800.00 2200.00 +Tablet iPad 700.00 700.00 +Tablet Kindle Fire 150.00 850.00 +Tablet Samsung 200.00 1050.00 + +query TTRT +SELECT group_name, product_name, price, array_agg(price) OVER (PARTITION BY group_name ROWS BETWEEN 1 PRECEDING AND 2 FOLLOWING) AS array_agg_price FROM products ORDER BY group_id; +---- +Smartphone Microsoft Lumia 200.00 {200.00,400.00,500.00} +Smartphone HTC One 400.00 {200.00,400.00,500.00,900.00} +Smartphone Nexus 500.00 {400.00,500.00,900.00} +Smartphone iPhone 900.00 {500.00,900.00} +Laptop HP Elite 1200.00 {1200.00,700.00,700.00} +Laptop Lenovo Thinkpad 700.00 {1200.00,700.00,700.00,800.00} +Laptop Sony VAIO 700.00 {700.00,700.00,800.00} +Laptop Dell 800.00 {700.00,800.00} +Tablet iPad 700.00 {700.00,150.00,200.00} +Tablet Kindle Fire 150.00 {700.00,150.00,200.00} +Tablet Samsung 200.00 {150.00,200.00} + +query TTRR +SELECT group_name, product_name, price, avg(price) OVER (PARTITION BY group_name RANGE UNBOUNDED PRECEDING) AS avg_price FROM products ORDER BY group_id; +---- +Smartphone Microsoft Lumia 200.00 500.00 +Smartphone HTC One 400.00 500.00 +Smartphone Nexus 500.00 500.00 +Smartphone iPhone 900.00 500.00 +Laptop HP Elite 1200.00 850.00 +Laptop Lenovo Thinkpad 700.00 850.00 +Laptop Sony VAIO 700.00 850.00 +Laptop Dell 800.00 850.00 +Tablet iPad 700.00 350.00 +Tablet Kindle Fire 150.00 350.00 +Tablet Samsung 200.00 350.00 + +query TTRT +SELECT group_name, product_name, price, min(price) OVER (PARTITION BY group_name ROWS BETWEEN 1 PRECEDING AND 2 PRECEDING) AS min_over_empty_frame FROM products ORDER BY group_id; +---- +Smartphone Microsoft Lumia 200.00 NULL +Smartphone HTC One 400.00 NULL +Smartphone Nexus 500.00 NULL +Smartphone iPhone 900.00 NULL +Laptop HP Elite 1200.00 NULL +Laptop Lenovo Thinkpad 700.00 NULL +Laptop Sony VAIO 700.00 NULL +Laptop Dell 800.00 NULL +Tablet iPad 700.00 NULL +Tablet Kindle Fire 150.00 NULL +Tablet Samsung 200.00 NULL + + +query TRRR +SELECT product_name, price, min(price) OVER (PARTITION BY group_name ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS min_over_three, max(price) OVER (PARTITION BY group_name ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS max_over_partition FROM products ORDER BY group_id; +---- +Microsoft Lumia 200.00 200.00 900.00 +HTC One 400.00 200.00 900.00 +Nexus 500.00 400.00 900.00 +iPhone 900.00 500.00 900.00 +HP Elite 1200.00 700.00 1200.00 +Lenovo Thinkpad 700.00 700.00 1200.00 +Sony VAIO 700.00 700.00 1200.00 +Dell 800.00 700.00 1200.00 +iPad 700.00 150.00 700.00 +Kindle Fire 150.00 150.00 700.00 +Samsung 200.00 150.00 700.00 diff --git a/pkg/sql/parser/parse_test.go b/pkg/sql/parser/parse_test.go index 441cce990a27..afab2390d2e6 100644 --- a/pkg/sql/parser/parse_test.go +++ b/pkg/sql/parser/parse_test.go @@ -756,6 +756,34 @@ func TestParse(t *testing.T) { {`SELECT avg(1) OVER (PARTITION BY b ORDER BY c) FROM t`}, {`SELECT avg(1) OVER (w PARTITION BY b ORDER BY c) FROM t`}, + {`SELECT avg(1) OVER (ROWS UNBOUNDED PRECEDING) FROM t`}, + {`SELECT avg(1) OVER (ROWS 1 PRECEDING) FROM t`}, + {`SELECT avg(1) OVER (ROWS CURRENT ROW) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN 1 PRECEDING AND 1 PRECEDING) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN 1 PRECEDING AND UNBOUNDED FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN CURRENT ROW AND CURRENT ROW) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN CURRENT ROW AND 1 FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (ROWS BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (RANGE UNBOUNDED PRECEDING) FROM t`}, + {`SELECT avg(1) OVER (RANGE CURRENT ROW) FROM t`}, + {`SELECT avg(1) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) FROM t`}, + {`SELECT avg(1) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (RANGE BETWEEN CURRENT ROW AND CURRENT ROW) FROM t`}, + {`SELECT avg(1) OVER (RANGE BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) FROM t`}, + {`SELECT avg(1) OVER (w ROWS UNBOUNDED PRECEDING) FROM t`}, + {`SELECT avg(1) OVER (PARTITION BY b ROWS UNBOUNDED PRECEDING) FROM t`}, + {`SELECT avg(1) OVER (ORDER BY c ROWS UNBOUNDED PRECEDING) FROM t`}, + {`SELECT avg(1) OVER (PARTITION BY b ORDER BY c ROWS UNBOUNDED PRECEDING) FROM t`}, + {`SELECT avg(1) OVER (w PARTITION BY b ORDER BY c ROWS UNBOUNDED PRECEDING) FROM t`}, + {`SELECT a FROM t UNION SELECT 1 FROM t`}, {`SELECT a FROM t UNION SELECT 1 FROM t UNION SELECT 1 FROM t`}, {`SELECT a FROM t UNION ALL SELECT 1 FROM t`}, @@ -1989,6 +2017,83 @@ SELECT max(a ORDER BY b) FROM ab ^ HINT: See: https://github.com/cockroachdb/cockroach/issues/23620`, }, + { + `SELECT avg(1) OVER (RANGE 1 PRECEDING) FROM t`, + `RANGE PRECEDING is only supported with UNBOUNDED at or near "preceding" +SELECT avg(1) OVER (RANGE 1 PRECEDING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (RANGE BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING) FROM t`, + `RANGE FOLLOWING is only supported with UNBOUNDED at or near "following" +SELECT avg(1) OVER (RANGE BETWEEN 1 FOLLOWING AND UNBOUNDED FOLLOWING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING) FROM t`, + `RANGE PRECEDING is only supported with UNBOUNDED at or near "preceding" +SELECT avg(1) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND 1 PRECEDING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) FROM t`, + `RANGE FOLLOWING is only supported with UNBOUNDED at or near "following" +SELECT avg(1) OVER (RANGE BETWEEN UNBOUNDED PRECEDING AND 1 FOLLOWING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (ROWS UNBOUNDED FOLLOWING) FROM t`, + `frame start cannot be UNBOUNDED FOLLOWING at or near "following" +SELECT avg(1) OVER (ROWS UNBOUNDED FOLLOWING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (ROWS 1 FOLLOWING) FROM t`, + `frame starting from following row cannot end with current row at or near "following" +SELECT avg(1) OVER (ROWS 1 FOLLOWING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (ROWS BETWEEN UNBOUNDED FOLLOWING AND UNBOUNDED FOLLOWING) FROM t`, + `frame start cannot be UNBOUNDED FOLLOWING at or near "following" +SELECT avg(1) OVER (ROWS BETWEEN UNBOUNDED FOLLOWING AND UNBOUNDED FOLLOWING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED PRECEDING) FROM t`, + `frame end cannot be UNBOUNDED PRECEDING at or near "preceding" +SELECT avg(1) OVER (ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED PRECEDING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (ROWS BETWEEN CURRENT ROW AND 1 PRECEDING) FROM t`, + `frame starting from current row cannot have preceding rows at or near "preceding" +SELECT avg(1) OVER (ROWS BETWEEN CURRENT ROW AND 1 PRECEDING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (ROWS BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM t`, + `frame starting from following row cannot have preceding rows at or near "preceding" +SELECT avg(1) OVER (ROWS BETWEEN 1 FOLLOWING AND 1 PRECEDING) FROM t + ^ +`, + }, + { + `SELECT avg(1) OVER (ROWS BETWEEN 1 FOLLOWING AND CURRENT ROW) FROM t`, + `frame starting from following row cannot have preceding rows at or near "row" +SELECT avg(1) OVER (ROWS BETWEEN 1 FOLLOWING AND CURRENT ROW) FROM t + ^ +`, + }, } for _, d := range testData { _, err := Parse(d.sql) diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 7c1bc6ebb4f4..ee0dc508a2fb 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -315,6 +315,15 @@ func (u *sqlSymUnion) orders() []*tree.Order { func (u *sqlSymUnion) groupBy() tree.GroupBy { return u.val.(tree.GroupBy) } +func (u *sqlSymUnion) windowFrame() *tree.WindowFrame { + return u.val.(*tree.WindowFrame) +} +func (u *sqlSymUnion) windowFrameBounds() tree.WindowFrameBounds { + return u.val.(tree.WindowFrameBounds) +} +func (u *sqlSymUnion) windowFrameBound() *tree.WindowFrameBound { + return u.val.(*tree.WindowFrameBound) +} func (u *sqlSymUnion) distinctOn() tree.DistinctOn { return u.val.(tree.DistinctOn) } @@ -932,7 +941,9 @@ func newNameFromStr(s string) *tree.Name { %type window_clause window_definition_list %type <*tree.WindowDef> window_definition over_clause window_specification %type opt_existing_window_name -%type opt_frame_clause frame_extent frame_bound +%type <*tree.WindowFrame> opt_frame_clause +%type frame_extent +%type <*tree.WindowFrameBound> frame_bound %type <[]tree.ColumnID> opt_tableref_col_list tableref_col_list @@ -7148,6 +7159,7 @@ window_specification: RefName: tree.Name($2), Partitions: $3.exprs(), OrderBy: $4.orderBy(), + Frame: $5.windowFrame(), } } @@ -7182,23 +7194,110 @@ opt_partition_clause: // This is only a subset of the full SQL:2008 frame_clause grammar. We don't // support yet. opt_frame_clause: - RANGE frame_extent { return unimplemented(sqllex, "frame range") } -| ROWS frame_extent { return unimplemented(sqllex, "frame rows") } -| /* EMPTY */ {} + RANGE frame_extent + { + bounds := $2.windowFrameBounds() + startBound := bounds.StartBound + endBound := bounds.EndBound + switch { + case startBound.BoundType == tree.ValuePreceding: + sqllex.Error("RANGE PRECEDING is only supported with UNBOUNDED") + return 1 + case startBound.BoundType == tree.ValueFollowing: + sqllex.Error("RANGE FOLLOWING is only supported with UNBOUNDED") + return 1 + case endBound != nil && endBound.BoundType == tree.ValuePreceding: + sqllex.Error("RANGE PRECEDING is only supported with UNBOUNDED") + return 1 + case endBound != nil && endBound.BoundType == tree.ValueFollowing: + sqllex.Error("RANGE FOLLOWING is only supported with UNBOUNDED") + return 1 + } + $$.val = &tree.WindowFrame{ + Mode: tree.RANGE, + Bounds: bounds, + } + } +| ROWS frame_extent + { + $$.val = &tree.WindowFrame{ + Mode: tree.ROWS, + Bounds: $2.windowFrameBounds(), + } + } +| /* EMPTY */ + { + $$.val = (*tree.WindowFrame)(nil) + } frame_extent: - frame_bound { return unimplemented(sqllex, "frame_extent") } -| BETWEEN frame_bound AND frame_bound { return unimplemented(sqllex, "frame_extent") } + frame_bound + { + startBound := $1.windowFrameBound() + switch { + case startBound.BoundType == tree.UnboundedFollowing: + sqllex.Error("frame start cannot be UNBOUNDED FOLLOWING") + return 1 + case startBound.BoundType == tree.ValueFollowing: + sqllex.Error("frame starting from following row cannot end with current row") + return 1 + } + $$.val = tree.WindowFrameBounds{StartBound: startBound} + } +| BETWEEN frame_bound AND frame_bound + { + startBound := $2.windowFrameBound() + endBound := $4.windowFrameBound() + switch { + case startBound.BoundType == tree.UnboundedFollowing: + sqllex.Error("frame start cannot be UNBOUNDED FOLLOWING") + return 1 + case endBound.BoundType == tree.UnboundedPreceding: + sqllex.Error("frame end cannot be UNBOUNDED PRECEDING") + return 1 + case startBound.BoundType == tree.CurrentRow && endBound.BoundType == tree.ValuePreceding: + sqllex.Error("frame starting from current row cannot have preceding rows") + return 1 + case startBound.BoundType == tree.ValueFollowing && endBound.BoundType == tree.ValuePreceding: + sqllex.Error("frame starting from following row cannot have preceding rows") + return 1 + case startBound.BoundType == tree.ValueFollowing && endBound.BoundType == tree.CurrentRow: + sqllex.Error("frame starting from following row cannot have preceding rows") + return 1 + } + $$.val = tree.WindowFrameBounds{StartBound: startBound, EndBound: endBound} + } // This is used for both frame start and frame end, with output set up on the // assumption it's frame start; the frame_extent productions must reject // invalid cases. frame_bound: - UNBOUNDED PRECEDING { return unimplemented(sqllex, "frame_bound") } -| UNBOUNDED FOLLOWING { return unimplemented(sqllex, "frame_bound") } -| CURRENT ROW { return unimplemented(sqllex, "frame_bound") } -| a_expr PRECEDING { return unimplemented(sqllex, "frame_bound") } -| a_expr FOLLOWING { return unimplemented(sqllex, "frame_bound") } + UNBOUNDED PRECEDING + { + $$.val = &tree.WindowFrameBound{BoundType: tree.UnboundedPreceding} + } +| UNBOUNDED FOLLOWING + { + $$.val = &tree.WindowFrameBound{BoundType: tree.UnboundedFollowing} + } +| CURRENT ROW + { + $$.val = &tree.WindowFrameBound{BoundType: tree.CurrentRow} + } +| a_expr PRECEDING + { + $$.val = &tree.WindowFrameBound{ + OffsetExpr: $1.expr(), + BoundType: tree.ValuePreceding, + } + } +| a_expr FOLLOWING + { + $$.val = &tree.WindowFrameBound{ + OffsetExpr: $1.expr(), + BoundType: tree.ValueFollowing, + } + } // Supporting nonterminals for expressions. diff --git a/pkg/sql/sem/builtins/aggregate_builtins.go b/pkg/sql/sem/builtins/aggregate_builtins.go index 50ed2eb923bf..da914c3f483a 100644 --- a/pkg/sql/sem/builtins/aggregate_builtins.go +++ b/pkg/sql/sem/builtins/aggregate_builtins.go @@ -156,7 +156,7 @@ var aggregates = map[string]builtinDefinition{ ReturnType: tree.FixedReturnType(types.Int), AggregateFunc: newCountRowsAggregate, WindowFunc: func(params []types.T, evalCtx *tree.EvalContext) tree.WindowFunc { - return newAggregateWindow(newCountRowsAggregate(params, evalCtx)) + return newFramableAggregateWindow(newCountRowsAggregate(params, evalCtx)) }, Info: "Calculates the number of rows.", }, @@ -318,7 +318,7 @@ func makeAggOverloadWithReturnType( ReturnType: retType, AggregateFunc: f, WindowFunc: func(params []types.T, evalCtx *tree.EvalContext) tree.WindowFunc { - return newAggregateWindow(f(params, evalCtx)) + return newFramableAggregateWindow(f(params, evalCtx)) }, Info: info, } diff --git a/pkg/sql/sem/builtins/window_builtins.go b/pkg/sql/sem/builtins/window_builtins.go index f19500b89ff6..76cf70408af1 100644 --- a/pkg/sql/sem/builtins/window_builtins.go +++ b/pkg/sql/sem/builtins/window_builtins.go @@ -125,6 +125,7 @@ func makeWindowOverload( } var _ tree.WindowFunc = &aggregateWindowFunc{} +var _ tree.WindowFunc = &framableAggregateWindowFunc{} var _ tree.WindowFunc = &rowNumberWindow{} var _ tree.WindowFunc = &rankWindow{} var _ tree.WindowFunc = &denseRankWindow{} @@ -143,21 +144,17 @@ type aggregateWindowFunc struct { peerRes tree.Datum } -func newAggregateWindow(agg tree.AggregateFunc) tree.WindowFunc { - return &aggregateWindowFunc{agg: agg} -} - func (w *aggregateWindowFunc) Compute( - ctx context.Context, evalCtx *tree.EvalContext, wf tree.WindowFrame, + ctx context.Context, evalCtx *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { - if !wf.FirstInPeerGroup() { + if !wfr.FirstInPeerGroup() { return w.peerRes, nil } // Accumulate all values in the peer group at the same time, as these // must return the same value. - for i := 0; i < wf.PeerRowCount; i++ { - args := wf.ArgsWithRowOffset(i) + for i := 0; i < wfr.PeerRowCount; i++ { + args := wfr.ArgsWithRowOffset(i) var value tree.Datum // COUNT_ROWS takes no arguments. if len(args) > 0 { @@ -169,7 +166,6 @@ func (w *aggregateWindowFunc) Compute( } // Retrieve the value for the entire peer group, save it, and return it. - peerRes, err := w.agg.Result() if err != nil { return nil, err @@ -182,6 +178,71 @@ func (w *aggregateWindowFunc) Close(ctx context.Context, evalCtx *tree.EvalConte w.agg.Close(ctx) } +// framableAggregateWindowFunc is a wrapper around aggregateWindowFunc that allows +// to reset the aggregate by creating a new instance via a provided constructor. +type framableAggregateWindowFunc struct { + agg *aggregateWindowFunc + aggConstructor func(*tree.EvalContext) tree.AggregateFunc +} + +func newFramableAggregateWindow(agg tree.AggregateFunc) tree.WindowFunc { + return &framableAggregateWindowFunc{agg: &aggregateWindowFunc{agg: agg}} +} + +func (w *framableAggregateWindowFunc) Compute( + ctx context.Context, evalCtx *tree.EvalContext, wfr *tree.WindowFrameRun, +) (tree.Datum, error) { + if !wfr.FirstInPeerGroup() { + return w.agg.peerRes, nil + } + if w.aggConstructor == nil { + // No constructor is given, so we use default approach. + return w.agg.Compute(ctx, evalCtx, wfr) + } + + // When aggConstructor is provided, we want to dispose of the old aggregate function + // and construct a new one for the computation. + w.agg.Close(ctx, evalCtx) + *w.agg = aggregateWindowFunc{w.aggConstructor(evalCtx), tree.DNull} + + // Accumulate all values in the window frame. + for i := wfr.FrameStartIdx(); i < wfr.FrameEndIdx(); i++ { + args := wfr.ArgsByRowIdx(i) + var value tree.Datum + // COUNT_ROWS takes no arguments. + if len(args) > 0 { + value = args[0] + } + if err := w.agg.agg.Add(ctx, value); err != nil { + return nil, err + } + } + + // Retrieve the value for the entire peer group, save it, and return it. + peerRes, err := w.agg.agg.Result() + if err != nil { + return nil, err + } + w.agg.peerRes = peerRes + return w.agg.peerRes, nil +} + +func (w *framableAggregateWindowFunc) Close(ctx context.Context, evalCtx *tree.EvalContext) { + w.agg.Close(ctx, evalCtx) +} + +// AddAggregateConstructorToFramableAggregate adds provided constructor to framableAggregateWindowFunc +// so that aggregates can be 'reset' when computing values over a window frame. +func AddAggregateConstructorToFramableAggregate( + windowFunc tree.WindowFunc, aggConstructor func(*tree.EvalContext) tree.AggregateFunc, +) { + // We only want to add aggConstructor to framableAggregateWindowFunc's since + // all non-aggregates builtins specific to window functions support framing "natively". + if framableAgg, ok := windowFunc.(*framableAggregateWindowFunc); ok { + framableAgg.aggConstructor = aggConstructor + } +} + // rowNumberWindow computes the number of the current row within its partition, // counting from 1. type rowNumberWindow struct{} @@ -191,9 +252,9 @@ func newRowNumberWindow([]types.T, *tree.EvalContext) tree.WindowFunc { } func (rowNumberWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { - return tree.NewDInt(tree.DInt(wf.RowIdx + 1 /* one-indexed */)), nil + return tree.NewDInt(tree.DInt(wfr.RowIdx + 1 /* one-indexed */)), nil } func (rowNumberWindow) Close(context.Context, *tree.EvalContext) {} @@ -208,10 +269,10 @@ func newRankWindow([]types.T, *tree.EvalContext) tree.WindowFunc { } func (w *rankWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { - if wf.FirstInPeerGroup() { - w.peerRes = tree.NewDInt(tree.DInt(wf.Rank())) + if wfr.FirstInPeerGroup() { + w.peerRes = tree.NewDInt(tree.DInt(wfr.Rank())) } return w.peerRes, nil } @@ -229,9 +290,9 @@ func newDenseRankWindow([]types.T, *tree.EvalContext) tree.WindowFunc { } func (w *denseRankWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { - if wf.FirstInPeerGroup() { + if wfr.FirstInPeerGroup() { w.denseRank++ w.peerRes = tree.NewDInt(tree.DInt(w.denseRank)) } @@ -253,16 +314,16 @@ func newPercentRankWindow([]types.T, *tree.EvalContext) tree.WindowFunc { var dfloatZero = tree.NewDFloat(0) func (w *percentRankWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { // Return zero if there's only one row, per spec. - if wf.RowCount() <= 1 { + if wfr.PartitionSize() <= 1 { return dfloatZero, nil } - if wf.FirstInPeerGroup() { + if wfr.FirstInPeerGroup() { // (rank - 1) / (total rows - 1) - w.peerRes = tree.NewDFloat(tree.DFloat(wf.Rank()-1) / tree.DFloat(wf.RowCount()-1)) + w.peerRes = tree.NewDFloat(tree.DFloat(wfr.Rank()-1) / tree.DFloat(wfr.PartitionSize()-1)) } return w.peerRes, nil } @@ -280,11 +341,11 @@ func newCumulativeDistWindow([]types.T, *tree.EvalContext) tree.WindowFunc { } func (w *cumulativeDistWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { - if wf.FirstInPeerGroup() { + if wfr.FirstInPeerGroup() { // (number of rows preceding or peer with current row) / (total rows) - w.peerRes = tree.NewDFloat(tree.DFloat(wf.FrameSize()) / tree.DFloat(wf.RowCount())) + w.peerRes = tree.NewDFloat(tree.DFloat(wfr.DefaultFrameSize()) / tree.DFloat(wfr.PartitionSize())) } return w.peerRes, nil } @@ -308,13 +369,13 @@ var errInvalidArgumentForNtile = pgerror.NewErrorf( pgerror.CodeInvalidParameterValueError, "argument of ntile() must be greater than zero") func (w *ntileWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { if w.ntile == nil { // If this is the first call to ntileWindow.Compute, set up the buckets. - total := wf.RowCount() + total := wfr.PartitionSize() - arg := wf.Args()[0] + arg := wfr.Args()[0] if arg == tree.DNull { // per spec: If argument is the null value, then the result is the null value. return tree.DNull, nil @@ -378,11 +439,11 @@ func makeLeadLagWindowConstructor( } func (w *leadLagWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { offset := 1 if w.withOffset { - offsetArg := wf.Args()[1] + offsetArg := wfr.Args()[1] if offsetArg == tree.DNull { return tree.DNull, nil } @@ -392,16 +453,16 @@ func (w *leadLagWindow) Compute( offset *= -1 } - if targetRow := wf.RowIdx + offset; targetRow < 0 || targetRow >= wf.RowCount() { + if targetRow := wfr.RowIdx + offset; targetRow < 0 || targetRow >= wfr.PartitionSize() { // Target row is out of the partition; supply default value if provided, // otherwise return NULL. if w.withDefault { - return wf.Args()[2], nil + return wfr.Args()[2], nil } return tree.DNull, nil } - return wf.ArgsWithRowOffset(offset)[0], nil + return wfr.ArgsWithRowOffset(offset)[0], nil } func (w *leadLagWindow) Close(context.Context, *tree.EvalContext) {} @@ -414,9 +475,9 @@ func newFirstValueWindow([]types.T, *tree.EvalContext) tree.WindowFunc { } func (firstValueWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { - return wf.Rows[0].Row[wf.ArgIdxStart], nil + return wfr.Rows[wfr.FrameStartIdx()].Row[wfr.ArgIdxStart], nil } func (firstValueWindow) Close(context.Context, *tree.EvalContext) {} @@ -429,9 +490,9 @@ func newLastValueWindow([]types.T, *tree.EvalContext) tree.WindowFunc { } func (lastValueWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { - return wf.Rows[wf.FrameSize()-1].Row[wf.ArgIdxStart], nil + return wfr.Rows[wfr.FrameEndIdx()-1].Row[wfr.ArgIdxStart], nil } func (lastValueWindow) Close(context.Context, *tree.EvalContext) {} @@ -448,9 +509,9 @@ var errInvalidArgumentForNthValue = pgerror.NewErrorf( pgerror.CodeInvalidParameterValueError, "argument of nth_value() must be greater than zero") func (nthValueWindow) Compute( - _ context.Context, _ *tree.EvalContext, wf tree.WindowFrame, + _ context.Context, _ *tree.EvalContext, wfr *tree.WindowFrameRun, ) (tree.Datum, error) { - arg := wf.Args()[1] + arg := wfr.Args()[1] if arg == tree.DNull { return tree.DNull, nil } @@ -460,12 +521,10 @@ func (nthValueWindow) Compute( return nil, errInvalidArgumentForNthValue } - // per spec: Only consider the rows within the "window frame", which by default contains - // the rows from the start of the partition through the last peer of the current row. - if nth > wf.FrameSize() { + if nth > wfr.FrameSize() { return tree.DNull, nil } - return wf.Rows[nth-1].Row[wf.ArgIdxStart], nil + return wfr.Rows[wfr.FrameStartIdx()+nth-1].Row[wfr.ArgIdxStart], nil } func (nthValueWindow) Close(context.Context, *tree.EvalContext) {} diff --git a/pkg/sql/sem/tree/select.go b/pkg/sql/sem/tree/select.go index 53ae2f8a313d..3201eaeffac1 100644 --- a/pkg/sql/sem/tree/select.go +++ b/pkg/sql/sem/tree/select.go @@ -541,6 +541,18 @@ func (node *Limit) Format(ctx *FmtCtx) { } } +// RowsFromExpr represents a ROWS FROM(...) expression. +type RowsFromExpr struct { + Items Exprs +} + +// Format implements the NodeFormatter interface. +func (node *RowsFromExpr) Format(ctx *FmtCtx) { + ctx.WriteString("ROWS FROM (") + ctx.FormatNode(&node.Items) + ctx.WriteByte(')') +} + // Window represents a WINDOW clause. type Window []*WindowDef @@ -562,6 +574,7 @@ type WindowDef struct { RefName Name Partitions Exprs OrderBy OrderBy + Frame *WindowFrame } // Format implements the NodeFormatter interface. @@ -590,21 +603,108 @@ func (node *WindowDef) Format(ctx *FmtCtx) { ctx.WriteString(orderByStr[1:]) } needSpaceSeparator = true - _ = needSpaceSeparator // avoid compiler warning until TODO below is addressed. } - // TODO(nvanbenschoten): Support Window Frames. - // if node.Frame != nil {} - ctx.WriteByte(')') + if node.Frame != nil { + if needSpaceSeparator { + ctx.WriteRune(' ') + } + ctx.FormatNode(node.Frame) + } + ctx.WriteRune(')') } -// RowsFromExpr represents a ROWS FROM(...) expression. -type RowsFromExpr struct { - Items Exprs +// WindowFrameMode indicates which mode of framing is used. +type WindowFrameMode int + +const ( + // RANGE is the mode of specifying frame in terms of logical range (e.g. 100 units cheaper). + RANGE WindowFrameMode = iota + // ROWS is the mode of specifying frame in terms of physical offsets (e.g. 1 row before etc). + ROWS +) + +// WindowFrameBoundType indicates which type of boundary is used. +type WindowFrameBoundType int + +const ( + // UnboundedPreceding represents UNBOUNDED PRECEDING type of boundary. + UnboundedPreceding WindowFrameBoundType = iota + // ValuePreceding represents 'value' PRECEDING type of boundary. + ValuePreceding + // CurrentRow represents CURRENT ROW type of boundary. + CurrentRow + // ValueFollowing represents 'value' FOLLOWING type of boundary. + ValueFollowing + // UnboundedFollowing represents UNBOUNDED FOLLOWING type of boundary. + UnboundedFollowing +) + +// WindowFrameBound specifies the offset and the type of boundary. +type WindowFrameBound struct { + BoundType WindowFrameBoundType + OffsetExpr Expr +} + +// WindowFrameBounds specifies boundaries of the window frame. +// The row at StartBound is included whereas the row at EndBound is not. +type WindowFrameBounds struct { + StartBound *WindowFrameBound + EndBound *WindowFrameBound +} + +// WindowFrame represents static state of window frame over which calculations are made. +type WindowFrame struct { + Mode WindowFrameMode // the mode of framing being used + Bounds WindowFrameBounds // the bounds of the frame +} + +func (boundary *WindowFrameBound) write(ctx *FmtCtx) { + switch boundary.BoundType { + case UnboundedPreceding: + ctx.WriteString("UNBOUNDED PRECEDING") + case ValuePreceding: + ctx.FormatNode(boundary.OffsetExpr) + ctx.WriteString(" PRECEDING") + case CurrentRow: + ctx.WriteString("CURRENT ROW") + case ValueFollowing: + ctx.FormatNode(boundary.OffsetExpr) + ctx.WriteString(" FOLLOWING") + case UnboundedFollowing: + ctx.WriteString("UNBOUNDED FOLLOWING") + default: + panic("unexpected WindowFrameBoundType") + } } // Format implements the NodeFormatter interface. -func (node *RowsFromExpr) Format(ctx *FmtCtx) { - ctx.WriteString("ROWS FROM (") - ctx.FormatNode(&node.Items) - ctx.WriteByte(')') +func (wf *WindowFrame) Format(ctx *FmtCtx) { + switch wf.Mode { + case RANGE: + ctx.WriteString("RANGE ") + case ROWS: + ctx.WriteString("ROWS ") + default: + panic("unexpected WindowFrameMode") + } + if wf.Bounds.EndBound != nil { + ctx.WriteString("BETWEEN ") + wf.Bounds.StartBound.write(ctx) + ctx.WriteString(" AND ") + wf.Bounds.EndBound.write(ctx) + } else { + wf.Bounds.StartBound.write(ctx) + } +} + +// Copy returns a deep copy of wf. +func (wf *WindowFrame) Copy() *WindowFrame { + frameCopy := &WindowFrame{Mode: wf.Mode} + startBoundCopy := *wf.Bounds.StartBound + frameCopy.Bounds = WindowFrameBounds{&startBoundCopy, nil} + if wf.Bounds.EndBound != nil { + endBoundCopy := *wf.Bounds.EndBound + frameCopy.Bounds.EndBound = &endBoundCopy + } + return frameCopy } diff --git a/pkg/sql/sem/tree/type_check.go b/pkg/sql/sem/tree/type_check.go index 97362beb8cec..a5cb82df781c 100644 --- a/pkg/sql/sem/tree/type_check.go +++ b/pkg/sql/sem/tree/type_check.go @@ -758,6 +758,24 @@ func (expr *FuncExpr) TypeCheck(ctx *SemaContext, desired types.T) (TypedExpr, e } expr.WindowDef.OrderBy[i].Expr = typedOrderBy } + if expr.WindowDef.Frame != nil { + bounds := expr.WindowDef.Frame.Bounds + startBound, endBound := bounds.StartBound, bounds.EndBound + if startBound.OffsetExpr != nil { + typedStartOffsetExpr, err := typeCheckAndRequire(ctx, startBound.OffsetExpr, types.Int, "window frame start") + if err != nil { + return nil, err + } + startBound.OffsetExpr = typedStartOffsetExpr + } + if endBound != nil && endBound.OffsetExpr != nil { + typedEndOffsetExpr, err := typeCheckAndRequire(ctx, endBound.OffsetExpr, types.Int, "window frame end") + if err != nil { + return nil, err + } + endBound.OffsetExpr = typedEndOffsetExpr + } + } } else { // Make sure the window function builtins are used as window function applications. if def.Class == WindowClass { diff --git a/pkg/sql/sem/tree/walk.go b/pkg/sql/sem/tree/walk.go index 2fe659febe6d..d4c9dc29d25f 100644 --- a/pkg/sql/sem/tree/walk.go +++ b/pkg/sql/sem/tree/walk.go @@ -222,6 +222,9 @@ func (expr *FuncExpr) CopyNode() *FuncExpr { } windowDef.OrderBy = newOrderBy } + if windowDef.Frame != nil { + windowDef.Frame = windowDef.Frame.Copy() + } } return &exprCopy } @@ -260,6 +263,28 @@ func (expr *FuncExpr) Walk(v Visitor) Expr { ret.WindowDef.OrderBy[i].Expr = e } } + if expr.WindowDef.Frame != nil { + startBound, endBound := expr.WindowDef.Frame.Bounds.StartBound, expr.WindowDef.Frame.Bounds.EndBound + if startBound.OffsetExpr != nil { + e, changed := WalkExpr(v, startBound.OffsetExpr) + if changed { + if ret == expr { + ret = expr.CopyNode() + } + ret.WindowDef.Frame.Bounds.StartBound.OffsetExpr = e + } + } + if endBound != nil && endBound.OffsetExpr != nil { + e, changed := WalkExpr(v, endBound.OffsetExpr) + if changed { + if ret == expr { + ret = expr.CopyNode() + } + ret.WindowDef.Frame.Bounds.EndBound.OffsetExpr = e + } + } + } + } if expr.Filter != nil { e, changed := WalkExpr(v, expr.Filter) diff --git a/pkg/sql/sem/tree/window_funs.go b/pkg/sql/sem/tree/window_funs.go index bc7a89d885ca..e9ec8eba1f03 100644 --- a/pkg/sql/sem/tree/window_funs.go +++ b/pkg/sql/sem/tree/window_funs.go @@ -14,7 +14,9 @@ package tree -import "context" +import ( + "context" +) // IndexedRow is a row with a corresponding index. type IndexedRow struct { @@ -22,12 +24,15 @@ type IndexedRow struct { Row Datums } -// WindowFrame is a view into a subset of data over which calculations are made. -type WindowFrame struct { +// WindowFrameRun contains the runtime state of window frame during calculations. +type WindowFrameRun struct { // constant for all calls to WindowFunc.Add - Rows []IndexedRow - ArgIdxStart int // the index which arguments to the window function begin - ArgCount int // the number of window function arguments + Rows []IndexedRow + ArgIdxStart int // the index which arguments to the window function begin + ArgCount int // the number of window function arguments + Frame *WindowFrame // If non-nil, Frame represents the frame specification of this window. If nil, default frame is used. + StartBoundOffset int + EndBoundOffset int // changes for each row (each call to WindowFunc.Add) RowIdx int // the current row index @@ -37,39 +42,165 @@ type WindowFrame struct { PeerRowCount int // the number of rows in the current peer group } -// Rank returns the rank of this frame. -func (wf WindowFrame) Rank() int { - return wf.RowIdx + 1 +// FrameStartIdx returns the index of starting row in the frame (which is the first to be included). +func (wfr WindowFrameRun) FrameStartIdx() int { + if wfr.Frame == nil { + return 0 + } + switch wfr.Frame.Mode { + case RANGE: + switch wfr.Frame.Bounds.StartBound.BoundType { + case UnboundedPreceding: + return 0 + case ValuePreceding: + // TODO(yuzefovich): Currently, it is not supported, and this case should not be reached. + panic("unsupported WindowFrameBoundType in RANGE mode") + case CurrentRow: + // Spec: in RANGE mode CURRENT ROW means that the frame starts with the current row's first peer. + return wfr.FirstPeerIdx + case ValueFollowing: + // TODO(yuzefovich): Currently, it is not supported, and this case should not be reached. + panic("unsupported WindowFrameBoundType in RANGE mode") + default: + panic("unexpected WindowFrameBoundType in RANGE mode") + } + case ROWS: + switch wfr.Frame.Bounds.StartBound.BoundType { + case UnboundedPreceding: + return 0 + case ValuePreceding: + idx := wfr.RowIdx - wfr.StartBoundOffset + if idx < 0 { + idx = 0 + } + return idx + case CurrentRow: + return wfr.RowIdx + case ValueFollowing: + idx := wfr.RowIdx + wfr.StartBoundOffset + if idx >= wfr.PartitionSize() { + idx = wfr.unboundedFollowing() + } + return idx + default: + panic("unexpected WindowFrameBoundType in ROWS mode") + } + default: + panic("unexpected WindowFrameMode") + } } -// RowCount returns the number of rows in this frame. -func (wf WindowFrame) RowCount() int { - return len(wf.Rows) +// FrameEndIdx returns the index of the first row after the frame. +func (wfr WindowFrameRun) FrameEndIdx() int { + if wfr.Frame == nil { + return wfr.DefaultFrameSize() + } + switch wfr.Frame.Mode { + case RANGE: + if wfr.Frame.Bounds.EndBound == nil { + // We're using default value of CURRENT ROW when EndBound is omitted. + // Spec: in RANGE mode CURRENT ROW means that the frame ends with the current row's last peer. + return wfr.DefaultFrameSize() + } + switch wfr.Frame.Bounds.EndBound.BoundType { + case ValuePreceding: + // TODO(yuzefovich): Currently, it is not supported, and this case should not be reached. + panic("unsupported WindowFrameBoundType in RANGE mode") + case CurrentRow: + return wfr.DefaultFrameSize() + case ValueFollowing: + // TODO(yuzefovich): Currently, it is not supported, and this case should not be reached. + panic("unsupported WindowFrameBoundType in RANGE mode") + case UnboundedFollowing: + return wfr.unboundedFollowing() + default: + panic("unexpected WindowFrameBoundType in RANGE mode") + } + case ROWS: + if wfr.Frame.Bounds.EndBound == nil { + // We're using default value of CURRENT ROW when EndBound is omitted. + return wfr.RowIdx + 1 + } + switch wfr.Frame.Bounds.EndBound.BoundType { + case ValuePreceding: + idx := wfr.RowIdx - wfr.EndBoundOffset + 1 + if idx < 0 { + idx = 0 + } + return idx + case CurrentRow: + return wfr.RowIdx + 1 + case ValueFollowing: + idx := wfr.RowIdx + wfr.EndBoundOffset + 1 + if idx >= wfr.PartitionSize() { + idx = wfr.unboundedFollowing() + } + return idx + case UnboundedFollowing: + return wfr.unboundedFollowing() + default: + panic("unexpected WindowFrameBoundType in ROWS mode") + } + default: + panic("unexpected WindowFrameMode") + } } -// FrameSize returns the size of this frame. -// TODO(nvanbenschoten): This definition only holds while we don't support -// frame specification (RANGE or ROWS) in the OVER clause. -func (wf WindowFrame) FrameSize() int { - return wf.FirstPeerIdx + wf.PeerRowCount +// FrameSize returns the number of rows in the current frame. +func (wfr WindowFrameRun) FrameSize() int { + if wfr.Frame == nil { + return wfr.DefaultFrameSize() + } + size := wfr.FrameEndIdx() - wfr.FrameStartIdx() + if size <= 0 { + size = 0 + } + return size +} + +// Rank returns the rank of the current row. +func (wfr WindowFrameRun) Rank() int { + return wfr.RowIdx + 1 +} + +// PartitionSize returns the number of rows in the current partition. +func (wfr WindowFrameRun) PartitionSize() int { + return len(wfr.Rows) +} + +// unboundedFollowing returns the index of the "first row beyond" the partition +// so that current frame contains all the rows till the end of the partition. +func (wfr WindowFrameRun) unboundedFollowing() int { + return wfr.PartitionSize() +} + +// DefaultFrameSize returns the size of default window frame which contains +// the rows from the start of the partition through the last peer of the current row. +func (wfr WindowFrameRun) DefaultFrameSize() int { + return wfr.FirstPeerIdx + wfr.PeerRowCount } // FirstInPeerGroup returns if the current row is the first in its peer group. -func (wf WindowFrame) FirstInPeerGroup() bool { - return wf.RowIdx == wf.FirstPeerIdx +func (wfr WindowFrameRun) FirstInPeerGroup() bool { + return wfr.RowIdx == wfr.FirstPeerIdx } // Args returns the current argument set in the window frame. -func (wf WindowFrame) Args() Datums { - return wf.ArgsWithRowOffset(0) +func (wfr WindowFrameRun) Args() Datums { + return wfr.ArgsWithRowOffset(0) +} + +// ArgsWithRowOffset returns the argument set at the given offset in the window frame. +func (wfr WindowFrameRun) ArgsWithRowOffset(offset int) Datums { + return wfr.Rows[wfr.RowIdx+offset].Row[wfr.ArgIdxStart : wfr.ArgIdxStart+wfr.ArgCount] } -// ArgsWithRowOffset returns the argumnent set at the given offset in the window frame. -func (wf WindowFrame) ArgsWithRowOffset(offset int) Datums { - return wf.Rows[wf.RowIdx+offset].Row[wf.ArgIdxStart : wf.ArgIdxStart+wf.ArgCount] +// ArgsByRowIdx returns the argument set of the row at idx. +func (wfr WindowFrameRun) ArgsByRowIdx(idx int) Datums { + return wfr.Rows[idx].Row[wfr.ArgIdxStart : wfr.ArgIdxStart+wfr.ArgCount] } -// WindowFunc performs a computation on each row using data from a provided WindowFrame. +// WindowFunc performs a computation on each row using data from a provided WindowFrameRun. type WindowFunc interface { // Compute computes the window function for the provided window frame, given the // current state of WindowFunc. The method should be called sequentially for every @@ -77,7 +208,7 @@ type WindowFunc interface { // because there is an implicit carried dependency between each row and all those // that have come before it (like in an AggregateFunc). As such, this approach does // not present any exploitable associativity/commutativity for optimization. - Compute(context.Context, *EvalContext, WindowFrame) (Datum, error) + Compute(context.Context, *EvalContext, *WindowFrameRun) (Datum, error) // Close allows the window function to free any memory it requested during execution, // such as during the execution of an aggregation like CONCAT_AGG or ARRAY_AGG. diff --git a/pkg/sql/window.go b/pkg/sql/window.go index 79cd1dbaae94..27d303c23b28 100644 --- a/pkg/sql/window.go +++ b/pkg/sql/window.go @@ -20,8 +20,8 @@ import ( "sort" "unsafe" - "github.com/pkg/errors" - + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/sql/sem/transform" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sem/types" @@ -152,12 +152,14 @@ type windowRun struct { windowValues [][]tree.Datum curRowIdx int + windowFrames []*tree.WindowFrame windowsAcc mon.BoundAccount } func (n *windowNode) startExec(params runParams) error { n.run.windowsAcc = params.EvalContext().Mon.MakeBoundAccount() + return nil } @@ -271,13 +273,14 @@ func (p *planner) constructWindowDefinitions( for _, windowDef := range sc.Window { name := string(windowDef.Name) if _, ok := namedWindowSpecs[name]; ok { - return errors.Errorf("window %q is already defined", name) + return pgerror.NewErrorf(pgerror.CodeWindowingError, "window %q is already defined", name) } namedWindowSpecs[name] = windowDef } + n.run.windowFrames = make([]*tree.WindowFrame, len(n.funcs)) // Construct window definitions for each window function application. - for _, windowFn := range n.funcs { + for idx, windowFn := range n.funcs { windowDef, err := constructWindowDef(*windowFn.expr.WindowDef, namedWindowSpecs) if err != nil { return err @@ -320,7 +323,7 @@ func (p *planner) constructWindowDefinitions( } } - windowFn.windowDef = windowDef + n.run.windowFrames[idx] = windowDef.Frame } return nil } @@ -352,7 +355,7 @@ func constructWindowDef( referencedSpec, ok := namedWindowSpecs[refName] if !ok { - return def, errors.Errorf("window %q does not exist", refName) + return def, pgerror.NewErrorf(pgerror.CodeUndefinedObjectError, "window %q does not exist", refName) } if !modifyRef { return *referencedSpec, nil @@ -360,17 +363,23 @@ func constructWindowDef( // referencedSpec.Partitions is always used. if len(def.Partitions) > 0 { - return def, errors.Errorf("cannot override PARTITION BY clause of window %q", refName) + return def, pgerror.NewErrorf(pgerror.CodeWindowingError, "cannot override PARTITION BY clause of window %q", refName) } def.Partitions = referencedSpec.Partitions // referencedSpec.OrderBy is used if set. if len(referencedSpec.OrderBy) > 0 { if len(def.OrderBy) > 0 { - return def, errors.Errorf("cannot override ORDER BY clause of window %q", refName) + return def, pgerror.NewErrorf(pgerror.CodeWindowingError, "cannot override ORDER BY clause of window %q", refName) } def.OrderBy = referencedSpec.OrderBy } + + if referencedSpec.Frame != nil { + return def, pgerror.NewErrorf(pgerror.CodeWindowingError, "cannot copy window %q because it has a frame clause", refName) + } + // TODO(yuzefovich): check the logic above, maybe we need to do or to check something else. + return def, nil } @@ -454,7 +463,7 @@ func (n *windowNode) replaceIndexVarsAndAggFuncs(s *renderNode) { } // The number of aggregation functions that need to be replaced with IndexedVars // is unknown, so we collect them here and bind them to an IndexedVarHelper later. - // We use a map indexed by render index to leverage addOrMergeRender's deduplication + // We use a map indexed by render index to leverage addOrReuseRender's deduplication // of identical aggregate functions. aggIVars := make(map[int]*tree.IndexedVar) @@ -564,6 +573,11 @@ type allPeers struct{} // allPeers implements the peerGroupChecker interface. func (allPeers) InSameGroup(i, j int) bool { return true } +type noPeers struct{} + +// noPeers implements the peerGroupChecker interface. +func (noPeers) InSameGroup(i, j int) bool { return false } + // peerGroupChecker can check if a pair of row indexes within a partition are // in the same peer group. type peerGroupChecker interface { @@ -609,6 +623,38 @@ func (n *windowNode) computeWindows(ctx context.Context, evalCtx *tree.EvalConte var scratchBytes []byte var scratchDatum []tree.Datum for windowIdx, windowFn := range n.funcs { + frameRun := &tree.WindowFrameRun{} + if n.run.windowFrames[windowIdx] != nil { + frameRun.Frame = n.run.windowFrames[windowIdx] + // OffsetExpr's must be integer expressions not containing any variables, aggregate functions, or window functions, + // so we need to make sure these expressions are evaluated before using offsets. + bounds := frameRun.Frame.Bounds + if bounds.StartBound.OffsetExpr != nil { + typedStartOffset := bounds.StartBound.OffsetExpr.(tree.TypedExpr) + dStartOffset, err := typedStartOffset.Eval(evalCtx) + if err != nil { + return err + } + startOffset := int(tree.MustBeDInt(dStartOffset)) + if startOffset < 0 { + return pgerror.NewErrorf(pgerror.CodeInvalidParameterValueError, "frame starting offset must not be negative") + } + frameRun.StartBoundOffset = startOffset + } + if bounds.EndBound != nil && bounds.EndBound.OffsetExpr != nil { + typedEndOffset := bounds.EndBound.OffsetExpr.(tree.TypedExpr) + dEndOffset, err := typedEndOffset.Eval(evalCtx) + if err != nil { + return err + } + endOffset := int(tree.MustBeDInt(dEndOffset)) + if endOffset < 0 { + return pgerror.NewErrorf(pgerror.CodeInvalidParameterValueError, "frame ending offset must not be negative") + } + frameRun.EndBoundOffset = endOffset + } + } + partitions := make(map[string][]tree.IndexedRow) if len(windowFn.partitionIdxs) == 0 { @@ -686,9 +732,10 @@ func (n *windowNode) computeWindows(ctx context.Context, evalCtx *tree.EvalConte builtin := windowFn.expr.GetWindowConstructor()(evalCtx) defer builtin.Close(ctx, evalCtx) - // Since we only support two types of window frames (see TODO above), we only - // need two possible types of peerGroupChecker's to help determine peer groups - // for given tuples. + // In order to calculate aggregates over a particular window frame, + // we need a way to 'reset' the aggregate, so this constructor will be used for that. + aggConstructor := windowFn.expr.GetAggregateConstructor() + var peerGrouper peerGroupChecker if windowFn.columnOrdering != nil { // If an ORDER BY clause is provided, order the partition and use the @@ -707,32 +754,39 @@ func (n *windowNode) computeWindows(ctx context.Context, evalCtx *tree.EvalConte // for functions with syntactically equivalent PARTITION BY and ORDER BY clauses. sort.Sort(sorter) peerGrouper = sorter + } else if frameRun.Frame != nil && frameRun.Frame.Mode == tree.ROWS { + // If ORDER BY clause is not provided and Frame is specified with ROWS mode, + // any row has no peers. + peerGrouper = noPeers{} } else { - // If no ORDER BY clause is provided, all rows in the partition are peers. + // If ORDER BY clause is not provided and either no Frame is provided or Frame is + // specified with RANGE mode, all rows are peers. peerGrouper = allPeers{} } - // Iterate over peer groups within partition using a window frame. - frame := tree.WindowFrame{ - Rows: partition, - ArgIdxStart: windowFn.argIdxStart, - ArgCount: windowFn.argCount, - RowIdx: 0, + frameRun.Rows = partition + frameRun.ArgIdxStart = windowFn.argIdxStart + frameRun.ArgCount = windowFn.argCount + frameRun.RowIdx = 0 + + if frameRun.Frame != nil { + builtins.AddAggregateConstructorToFramableAggregate(builtin, aggConstructor) } - for frame.RowIdx < len(partition) { + + for frameRun.RowIdx < len(partition) { // Compute the size of the current peer group. - frame.FirstPeerIdx = frame.RowIdx - frame.PeerRowCount = 1 - for ; frame.FirstPeerIdx+frame.PeerRowCount < len(partition); frame.PeerRowCount++ { - cur := frame.FirstPeerIdx + frame.PeerRowCount + frameRun.FirstPeerIdx = frameRun.RowIdx + frameRun.PeerRowCount = 1 + for ; frameRun.FirstPeerIdx+frameRun.PeerRowCount < frameRun.PartitionSize(); frameRun.PeerRowCount++ { + cur := frameRun.FirstPeerIdx + frameRun.PeerRowCount if !peerGrouper.InSameGroup(cur, cur-1) { break } } // Perform calculations on each row in the current peer group. - for ; frame.RowIdx < frame.FirstPeerIdx+frame.PeerRowCount; frame.RowIdx++ { - res, err := builtin.Compute(ctx, evalCtx, frame) + for ; frameRun.RowIdx < frameRun.FirstPeerIdx+frameRun.PeerRowCount; frameRun.RowIdx++ { + res, err := builtin.Compute(ctx, evalCtx, frameRun) if err != nil { return err } @@ -744,7 +798,7 @@ func (n *windowNode) computeWindows(ctx context.Context, evalCtx *tree.EvalConte } // Save result into n.run.windowValues, indexed by original row index. - valRowIdx := partition[frame.RowIdx].Idx + valRowIdx := partition[frameRun.RowIdx].Idx n.run.windowValues[valRowIdx][windowIdx] = res } } @@ -843,7 +897,7 @@ func (v *extractWindowFuncsVisitor) VisitPre(expr tree.Expr) (recurse bool, newE // Make sure this window function does not contain another window function. for _, argExpr := range t.Exprs { if v.subWindowVisitor.ContainsWindowFunc(argExpr) { - v.err = fmt.Errorf("window function calls cannot be nested under %s()", &t.Func) + v.err = pgerror.NewErrorf(pgerror.CodeWindowingError, "window function calls cannot be nested") return false, expr } } @@ -913,7 +967,6 @@ type windowFuncHolder struct { argIdxStart int // index of the window function's first arguments in window.wrappedValues argCount int // number of arguments taken by the window function - windowDef tree.WindowDef partitionIdxs []int columnOrdering sqlbase.ColumnOrdering }