From b004b4fff59a9ea5b1845fe8d5ece78cbf42759c Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Sat, 18 Feb 2023 12:11:35 -0500 Subject: [PATCH 1/3] minor: port more window tests --- datafusion/core/tests/sql/window.rs | 433 ------------------ .../tests/sqllogictests/test_files/window.slt | 281 ++++++++++++ datafusion/expr/src/window_frame.rs | 2 +- 3 files changed, 282 insertions(+), 434 deletions(-) diff --git a/datafusion/core/tests/sql/window.rs b/datafusion/core/tests/sql/window.rs index 7ef4af23a059..6c6573313a59 100644 --- a/datafusion/core/tests/sql/window.rs +++ b/datafusion/core/tests/sql/window.rs @@ -20,439 +20,6 @@ use ::parquet::arrow::arrow_writer::ArrowWriter; use ::parquet::file::properties::WriterProperties; use datafusion::execution::options::ReadOptions; -#[tokio::test] -async fn window_in_expression() -> Result<()> { - let ctx = SessionContext::new(); - let sql = "select 1 - lag(amount, 1) over (order by idx) as column1 from (values ('a', 1, 100), ('a', 2, 150)) as t (col1, idx, amount)"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+---------+", - "| column1 |", - "+---------+", - "| |", - "| -99 |", - "+---------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_with_agg_in_expression() -> Result<()> { - let ctx = SessionContext::new(); - let sql = "select col1, idx, count(*), sum(amount), lag(sum(amount), 1) over (order by idx) as prev_amount, - sum(amount) - lag(sum(amount), 1) over (order by idx) as difference from ( - select * from (values ('a', 1, 100), ('a', 2, 150)) as t (col1, idx, amount) - ) a - group by col1, idx;"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+------+-----+-----------------+---------------+-------------+------------+", - "| col1 | idx | COUNT(UInt8(1)) | SUM(a.amount) | prev_amount | difference |", - "+------+-----+-----------------+---------------+-------------+------------+", - "| a | 1 | 1 | 100 | | |", - "| a | 2 | 1 | 150 | 100 | 50 |", - "+------+-----+-----------------+---------------+-------------+------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_empty() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = "SELECT \ - SUM(c3) OVER() as sum1, \ - COUNT(*) OVER () as count1 \ - FROM aggregate_test_100 \ - ORDER BY c9 \ - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+------+--------+", - "| sum1 | count1 |", - "+------+--------+", - "| 781 | 100 |", - "| 781 | 100 |", - "| 781 | 100 |", - "| 781 | 100 |", - "| 781 | 100 |", - "+------+--------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_rows_preceding() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = "SELECT \ - SUM(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ - AVG(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ - COUNT(*) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)\ - FROM aggregate_test_100 \ - ORDER BY c9 \ - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----------------------------+----------------------------+-----------------+", - "| SUM(aggregate_test_100.c4) | AVG(aggregate_test_100.c4) | COUNT(UInt8(1)) |", - "+----------------------------+----------------------------+-----------------+", - "| -48302 | -16100.666666666666 | 3 |", - "| 11243 | 3747.6666666666665 | 3 |", - "| -51311 | -17103.666666666668 | 3 |", - "| -2391 | -797 | 3 |", - "| 46756 | 15585.333333333334 | 3 |", - "+----------------------------+----------------------------+-----------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_rows_preceding_stddev_variance() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = "SELECT \ - VAR(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ - VAR_POP(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ - STDDEV(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ - STDDEV_POP(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)\ - FROM aggregate_test_100 \ - ORDER BY c9 \ - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+---------------------------------+------------------------------------+-------------------------------+----------------------------------+", - "| VARIANCE(aggregate_test_100.c4) | VARIANCEPOP(aggregate_test_100.c4) | STDDEV(aggregate_test_100.c4) | STDDEVPOP(aggregate_test_100.c4) |", - "+---------------------------------+------------------------------------+-------------------------------+----------------------------------+", - "| 46721.33333333174 | 31147.555555554496 | 216.15118166073427 | 176.4867007894773 |", - "| 2639429.333333332 | 1759619.5555555548 | 1624.6320609089714 | 1326.5065229977404 |", - "| 746202.3333333324 | 497468.2222222216 | 863.8300372951455 | 705.3142719541563 |", - "| 768422.9999999981 | 512281.9999999988 | 876.5973990378925 | 715.7387791645767 |", - "| 66526.3333333288 | 44350.88888888587 | 257.9269922542594 | 210.5965073045749 |", - "+---------------------------------+------------------------------------+-------------------------------+----------------------------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_rows_preceding_with_partition_unique_order_by() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = "SELECT \ - SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ - AVG(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ - COUNT(*) OVER(PARTITION BY c2 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)\ - FROM aggregate_test_100 \ - ORDER BY c9 \ - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----------------------------+----------------------------+-----------------+", - "| SUM(aggregate_test_100.c4) | AVG(aggregate_test_100.c4) | COUNT(UInt8(1)) |", - "+----------------------------+----------------------------+-----------------+", - "| -38611 | -19305.5 | 2 |", - "| 17547 | 8773.5 | 2 |", - "| -1301 | -650.5 | 2 |", - "| 26638 | 13319 | 3 |", - "| 26861 | 8953.666666666666 | 3 |", - "+----------------------------+----------------------------+-----------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} -/// The partition by clause conducts sorting according to given partition column by default. If the -/// sorting columns have non unique values, the unstable sorting may produce indeterminate results. -/// Therefore, we are commenting out the following test for now. - -// #[tokio::test] -// async fn window_frame_rows_preceding_with_non_unique_partition() -> Result<()> { -// let ctx = SessionContext::new(); -// register_aggregate_csv(&ctx).await?; -// let sql = "SELECT \ -// SUM(c4) OVER(PARTITION BY c1 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ -// COUNT(*) OVER(PARTITION BY c2 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING)\ -// FROM aggregate_test_100 \ -// ORDER BY c9 \ -// LIMIT 5"; -// let actual = execute_to_batches(&ctx, sql).await; -// let expected = vec![ -// "+----------------------------+-----------------+", -// "| SUM(aggregate_test_100.c4) | COUNT(UInt8(1)) |", -// "+----------------------------+-----------------+", -// "| -33822 | 3 |", -// "| 20808 | 3 |", -// "| -29881 | 3 |", -// "| -47613 | 3 |", -// "| -13474 | 3 |", -// "+----------------------------+-----------------+", -// ]; -// assert_batches_eq!(expected, &actual); -// Ok(()) -// } - -#[tokio::test] -async fn window_frame_ranges_preceding_following_desc() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = "SELECT \ - SUM(c4) OVER(ORDER BY c2 DESC RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING),\ - SUM(c3) OVER(ORDER BY c2 DESC RANGE BETWEEN 10000 PRECEDING AND 10000 FOLLOWING),\ - COUNT(*) OVER(ORDER BY c2 DESC RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) \ - FROM aggregate_test_100 \ - ORDER BY c9 \ - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----------------------------+----------------------------+-----------------+", - "| SUM(aggregate_test_100.c4) | SUM(aggregate_test_100.c3) | COUNT(UInt8(1)) |", - "+----------------------------+----------------------------+-----------------+", - "| 52276 | 781 | 56 |", - "| 260620 | 781 | 63 |", - "| -28623 | 781 | 37 |", - "| 260620 | 781 | 63 |", - "| 260620 | 781 | 63 |", - "+----------------------------+----------------------------+-----------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_order_by_asc_desc_large() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = "SELECT - SUM(c5) OVER (ORDER BY c2 ASC, c6 DESC) as sum1 - FROM aggregate_test_100 - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+-------------+", - "| sum1 |", - "+-------------+", - "| -1383162419 |", - "| -3265456275 |", - "| -3909681744 |", - "| -5241214934 |", - "| -4246910946 |", - "+-------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_order_by_desc_large() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_csv(&ctx).await?; - let sql = "SELECT - SUM(c5) OVER (ORDER BY c2 DESC, c6 ASC) as sum1 - FROM aggregate_test_100 - ORDER BY c9 - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+-------------+", - "| sum1 |", - "+-------------+", - "| 11212193439 |", - "| 22799733943 |", - "| 2935356871 |", - "| 15810962683 |", - "| 18035025006 |", - "+-------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_order_by_null_timestamp_order_by() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_null_cases_csv(&ctx).await?; - let sql = "SELECT - SUM(c1) OVER (ORDER BY c2 DESC) as summation1 - FROM null_cases - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+------------+", - "| summation1 |", - "+------------+", - "| 962 |", - "| 962 |", - "| 962 |", - "| 962 |", - "| 962 |", - "+------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_order_by_null_desc() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_null_cases_csv(&ctx).await?; - let sql = "SELECT - COUNT(c2) OVER (ORDER BY c1 DESC RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) - FROM null_cases - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----------------------+", - "| COUNT(null_cases.c2) |", - "+----------------------+", - "| 9 |", - "| 9 |", - "| 9 |", - "| 9 |", - "| 9 |", - "+----------------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_order_by_null_asc() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_null_cases_csv(&ctx).await?; - let sql = "SELECT - COUNT(c2) OVER (ORDER BY c1 RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) - FROM null_cases - ORDER BY c1 - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----------------------+", - "| COUNT(null_cases.c2) |", - "+----------------------+", - "| 2 |", - "| 2 |", - "| 2 |", - "| 2 |", - "| 5 |", - "+----------------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_order_by_null_asc_null_first() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_null_cases_csv(&ctx).await?; - let sql = "SELECT - COUNT(c2) OVER (ORDER BY c1 NULLS FIRST RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) - FROM null_cases - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----------------------+", - "| COUNT(null_cases.c2) |", - "+----------------------+", - "| 9 |", - "| 9 |", - "| 9 |", - "| 9 |", - "| 9 |", - "+----------------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_order_by_null_desc_null_last() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_null_cases_csv(&ctx).await?; - let sql = "SELECT - COUNT(c2) OVER (ORDER BY c1 DESC NULLS LAST RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) - FROM null_cases - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+----------------------+", - "| COUNT(null_cases.c2) |", - "+----------------------+", - "| 5 |", - "| 5 |", - "| 5 |", - "| 6 |", - "| 6 |", - "+----------------------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - -#[tokio::test] -async fn window_frame_rows_order_by_null() -> Result<()> { - let ctx = SessionContext::new(); - register_aggregate_null_cases_csv(&ctx).await?; - let sql = "SELECT - SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as a, - SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as b, - SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as c, - SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as d, - SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as e, - SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as f, - SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as g, - SUM(c1) OVER (ORDER BY c3) as h, - SUM(c1) OVER (ORDER BY c3 DESC) as i, - SUM(c1) OVER (ORDER BY c3 NULLS first) as j, - SUM(c1) OVER (ORDER BY c3 DESC NULLS first) as k, - SUM(c1) OVER (ORDER BY c3 DESC NULLS last) as l, - SUM(c1) OVER (ORDER BY c3, c2) as m, - SUM(c1) OVER (ORDER BY c3, c1 DESC) as n, - SUM(c1) OVER (ORDER BY c3 DESC, c1) as o, - SUM(c1) OVER (ORDER BY c3, c1 NULLs first) as p, - SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as a1, - SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as b1, - SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as c1, - SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as d1, - SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as e1, - SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as f1, - SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as g1, - SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as h1, - SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as j1, - SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as k1, - SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as l1, - SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as m1, - SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as n1, - SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as o1, - SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as h11, - SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as j11, - SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as k11, - SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as l11, - SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as m11, - SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as n11, - SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as o11 - FROM null_cases - ORDER BY c3 - LIMIT 5"; - let actual = execute_to_batches(&ctx, sql).await; - let expected = vec![ - "+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+------+------+-----+-----+------+-----+-----+-----+------+-----+------+------+-----+-----+-----+------+-----+------+------+-----+------+------+-----+------+-----+-----+------+", - "| a | b | c | d | e | f | g | h | i | j | k | l | m | n | o | p | a1 | b1 | c1 | d1 | e1 | f1 | g1 | h1 | j1 | k1 | l1 | m1 | n1 | o1 | h11 | j11 | k11 | l11 | m11 | n11 | o11 |", - "+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+------+------+-----+-----+------+-----+-----+-----+------+-----+------+------+-----+-----+-----+------+-----+------+------+-----+------+------+-----+------+-----+-----+------+", - "| 412 | 412 | 339 | 412 | 339 | 339 | 412 | | 4627 | | 4627 | 4627 | | | 4627 | | 412 | 412 | 4627 | 412 | 4627 | 4627 | 412 | | | 4627 | | 4627 | 4627 | | 4627 | 4627 | | 4627 | | | 4627 |", - "| 488 | 488 | 412 | 488 | 412 | 412 | 488 | 72 | 4627 | 72 | 4627 | 4627 | 72 | 72 | 4627 | 72 | 488 | 488 | 4627 | 488 | 4627 | 4627 | 488 | 72 | 72 | 4627 | 72 | 4627 | 4627 | 72 | 4627 | 4627 | 72 | 4627 | 72 | 72 | 4627 |", - "| 543 | 543 | 488 | 543 | 488 | 488 | 543 | 96 | 4555 | 96 | 4555 | 4555 | 96 | 96 | 4555 | 96 | 543 | 543 | 4627 | 543 | 4627 | 4627 | 543 | 96 | 96 | 4555 | 96 | 4555 | 4555 | 96 | 4555 | 4555 | 96 | 4555 | 96 | 96 | 4555 |", - "| 553 | 553 | 543 | 553 | 543 | 543 | 553 | 115 | 4531 | 115 | 4531 | 4531 | 115 | 115 | 4531 | 115 | 553 | 553 | 4627 | 553 | 4627 | 4627 | 553 | 115 | 115 | 4531 | 115 | 4531 | 4531 | 115 | 4531 | 4531 | 115 | 4531 | 115 | 115 | 4531 |", - "| 553 | 553 | 553 | 553 | 553 | 553 | 553 | 140 | 4512 | 140 | 4512 | 4512 | 140 | 140 | 4512 | 140 | 553 | 553 | 4627 | 553 | 4627 | 4627 | 553 | 140 | 140 | 4512 | 140 | 4512 | 4512 | 140 | 4512 | 4512 | 140 | 4512 | 140 | 140 | 4512 |", - "+-----+-----+-----+-----+-----+-----+-----+-----+------+-----+------+------+-----+-----+------+-----+-----+-----+------+-----+------+------+-----+-----+-----+------+-----+------+------+-----+------+------+-----+------+-----+-----+------+", - ]; - assert_batches_eq!(expected, &actual); - Ok(()) -} - #[tokio::test] async fn window_frame_rows_preceding_with_unique_partition() -> Result<()> { let ctx = SessionContext::new(); diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt index 7c83c1b9b555..27ad459507b0 100644 --- a/datafusion/core/tests/sqllogictests/test_files/window.slt +++ b/datafusion/core/tests/sqllogictests/test_files/window.slt @@ -35,6 +35,16 @@ STORED AS CSV WITH HEADER ROW LOCATION '../../testing/data/csv/aggregate_test_100.csv' +statement ok +CREATE EXTERNAL TABLE null_cases( + c1 BIGINT NULL, + c2 DOUBLE NULL, + c3 BIGINT NULL +) +STORED AS CSV +WITH HEADER ROW +LOCATION 'tests/data/null_cases.csv'; + ### This is the same table as ### execute_with_partition with 4 partitions statement ok @@ -398,3 +408,274 @@ WITH _sample_data AS ( ---- aa 3 2 bb 7 2 + + +# async fn window_in_expression() -> Result<()> { +query I +select 1 - lag(amount, 1) over (order by idx) as column1 from (values ('a', 1, 100), ('a', 2, 150)) as t (col1, idx, amount) +--- +---- +NULL +-99 + + +# async fn window_with_agg_in_expression() -> Result<()> { +query TIIIII +select col1, idx, count(*), sum(amount), lag(sum(amount), 1) over (order by idx) as prev_amount, +sum(amount) - lag(sum(amount), 1) over (order by idx) as difference from ( +select * from (values ('a', 1, 100), ('a', 2, 150)) as t (col1, idx, amount) +) a +group by col1, idx +---- +a 1 1 100 NULL NULL +a 2 1 150 100 50 + + +# async fn window_frame_empty() -> Result<()> { +query II +SELECT +SUM(c3) OVER() as sum1, +COUNT(*) OVER () as count1 +FROM aggregate_test_100 +ORDER BY c9 +LIMIT 5 +---- +781 100 +781 100 +781 100 +781 100 +781 100 + +# async fn window_frame_rows_preceding() -> Result<()> { +query IRI +SELECT +SUM(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), +AVG(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), +COUNT(*) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) +FROM aggregate_test_100 +ORDER BY c9 +LIMIT 5 +---- +-48302 -16100.666666666666 3 +11243 3747.666666666667 3 +-51311 -17103.666666666668 3 +-2391 -797 3 +46756 15585.333333333334 3 + + +# async fn window_frame_rows_preceding_stddev_variance() -> Result<()> { +query RRRR +SELECT +VAR(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), +VAR_POP(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), +STDDEV(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), +STDDEV_POP(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) +FROM aggregate_test_100 +ORDER BY c9 +LIMIT 5 +---- +46721.33333333174 31147.555555554496 216.151181660734 176.486700789477 +2639429.333333332 1759619.5555555548 1624.632060908971 1326.50652299774 +746202.3333333324 497468.2222222216 863.830037295146 705.314271954156 +768422.9999999981 512281.9999999988 876.597399037893 715.738779164577 +66526.3333333288 44350.88888888587 257.926992254259 210.596507304575 + +# async fn window_frame_rows_preceding_with_partition_unique_order_by() -> Result<()> { +query IRI +SELECT +SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), +AVG(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), +COUNT(*) OVER(PARTITION BY c2 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) +FROM aggregate_test_100 +ORDER BY c9 +LIMIT 5 +---- +-38611 -19305.5 2 +17547 8773.5 2 +-1301 -650.5 2 +26638 13319 3 +26861 8953.666666666666 3 + +# /// The partition by clause conducts sorting according to given partition column by default. If the +# /// sorting columns have non unique values, the unstable sorting may produce indeterminate results. +# /// Therefore, we are commenting out the following test for now. + +#// #[tokio::test] +#// async fn window_frame_rows_preceding_with_non_unique_partition() -> Result<()> { +#// let ctx = SessionContext::new(); +#// register_aggregate_csv(&ctx).await?; +#// let sql = "SELECT +#// SUM(c4) OVER(PARTITION BY c1 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), +#// COUNT(*) OVER(PARTITION BY c2 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) +#// FROM aggregate_test_100 +#// ORDER BY c9 +#// LIMIT 5 +#// let actual = execute_to_batches(&ctx, sql).await; +#// let expected = vec![ +#// "+----------------------------+-----------------+", +#// "| SUM(aggregate_test_100.c4) | COUNT(UInt8(1)) |", +#// "+----------------------------+-----------------+", +#// "| -33822 | 3|", +#// "| 20808 | 3|", +#// "| -29881 | 3|", +#// "| -47613 | 3|", +#// "| -13474 | 3|", +#// "+----------------------------+-----------------+", +#// ]; +#// assert_batches_eq!(expected, &actual); +#// Ok(()) +#// } + +# async fn window_frame_ranges_preceding_following_desc() -> Result<()> { +query error DataFusion error: Internal error: Operator + is not implemented for types Int8(5) and Utf8("1"). This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker +SELECT +SUM(c4) OVER(ORDER BY c2 DESC RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING), +SUM(c3) OVER(ORDER BY c2 DESC RANGE BETWEEN 10000 PRECEDING AND 10000 FOLLOWING), +COUNT(*) OVER(ORDER BY c2 DESC RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) +FROM aggregate_test_100 +ORDER BY c9 +LIMIT 5 + +# async fn window_frame_order_by_asc_desc_large() -> Result<()> { +query I +SELECT + SUM(c5) OVER (ORDER BY c2 ASC, c6 DESC) as sum1 + FROM aggregate_test_100 + LIMIT 5 +---- +-1383162419 +-3265456275 +-3909681744 +-5241214934 +-4246910946 + + +# async fn window_frame_order_by_desc_large() -> Result<()> { +query I +SELECT + SUM(c5) OVER (ORDER BY c2 DESC, c6 ASC) as sum1 + FROM aggregate_test_100 + ORDER BY c9 + LIMIT 5 +---- +11212193439 +22799733943 +2935356871 +15810962683 +18035025006 + +# async fn window_frame_order_by_null_timestamp_order_by() -> Result<()> { +query I +SELECT + SUM(c1) OVER (ORDER BY c2 DESC) as summation1 + FROM null_cases + LIMIT 5 +---- +962 +962 +962 +962 +962 + +# async fn window_frame_order_by_null_desc() -> Result<()> { +query I +SELECT + COUNT(c2) OVER (ORDER BY c1 DESC RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) + FROM null_cases + LIMIT 5 +---- +9 +9 +9 +9 +9 + +# async fn window_frame_order_by_null_asc() -> Result<()> { +query I +SELECT + COUNT(c2) OVER (ORDER BY c1 RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) + FROM null_cases + ORDER BY c1 + LIMIT 5 +---- +2 +2 +2 +2 +5 + +# async fn window_frame_order_by_null_asc_null_first() -> Result<()> { +query I +SELECT + COUNT(c2) OVER (ORDER BY c1 NULLS FIRST RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) + FROM null_cases + LIMIT 5 +---- +9 +9 +9 +9 +9 + +# async fn window_frame_order_by_null_desc_null_last() -> Result<()> { +query I +SELECT + COUNT(c2) OVER (ORDER BY c1 DESC NULLS LAST RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) + FROM null_cases + LIMIT 5 +---- +5 +5 +5 +6 +6 + +# async fn window_frame_rows_order_by_null() -> Result<()> { +query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII +SELECT + SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as a, + SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as b, + SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as c, + SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as d, + SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as e, + SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as f, + SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as g, + SUM(c1) OVER (ORDER BY c3) as h, + SUM(c1) OVER (ORDER BY c3 DESC) as i, + SUM(c1) OVER (ORDER BY c3 NULLS first) as j, + SUM(c1) OVER (ORDER BY c3 DESC NULLS first) as k, + SUM(c1) OVER (ORDER BY c3 DESC NULLS last) as l, + SUM(c1) OVER (ORDER BY c3, c2) as m, + SUM(c1) OVER (ORDER BY c3, c1 DESC) as n, + SUM(c1) OVER (ORDER BY c3 DESC, c1) as o, + SUM(c1) OVER (ORDER BY c3, c1 NULLs first) as p, + SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as a1, + SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as b1, + SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as c1, + SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as d1, + SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as e1, + SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as f1, + SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND 11 FOLLOWING) as g1, + SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as h1, + SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as j1, + SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as k1, + SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as l1, + SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as m1, + SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as n1, + SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN UNBOUNDED PRECEDING AND current row) as o1, + SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as h11, + SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as j11, + SUM(c1) OVER (ORDER BY c3 DESC RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as k11, + SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as l11, + SUM(c1) OVER (ORDER BY c3 DESC NULLS last RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as m11, + SUM(c1) OVER (ORDER BY c3 DESC NULLS first RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as n11, + SUM(c1) OVER (ORDER BY c3 NULLS first RANGE BETWEEN current row AND UNBOUNDED FOLLOWING) as o11 + FROM null_cases + ORDER BY c3 + LIMIT 5 +---- +412 412 339 412 339 339 412 NULL 4627 NULL 4627 4627 NULL NULL 4627 NULL 412 412 4627 412 4627 4627 412 NULL NULL 4627 NULL 4627 4627 NULL 4627 4627 NULL 4627 NULL NULL 4627 +488 488 412 488 412 412 488 72 4627 72 4627 4627 72 72 4627 72 488 488 4627 488 4627 4627 488 72 72 4627 72 4627 4627 72 4627 4627 72 4627 72 72 4627 +543 543 488 543 488 488 543 96 4555 96 4555 4555 96 96 4555 96 543 543 4627 543 4627 4627 543 96 96 4555 96 4555 4555 96 4555 4555 96 4555 96 96 4555 +553 553 543 553 543 543 553 115 4531 115 4531 4531 115 115 4531 115 553 553 4627 553 4627 4627 553 115 115 4531 115 4531 4531 115 4531 4531 115 4531 115 115 4531 +553 553 553 553 553 553 553 140 4512 140 4512 4512 140 140 4512 140 553 553 4627 553 4627 4627 553 140 140 4512 140 4512 4512 140 4512 4512 140 4512 140 140 4512 diff --git a/datafusion/expr/src/window_frame.rs b/datafusion/expr/src/window_frame.rs index c25d2491e45a..7794125d0eec 100644 --- a/datafusion/expr/src/window_frame.rs +++ b/datafusion/expr/src/window_frame.rs @@ -164,7 +164,7 @@ pub fn regularize(mut frame: WindowFrame, order_bys: usize) -> Result Date: Mon, 20 Feb 2023 05:38:39 -0700 Subject: [PATCH 2/3] Update comments and add link to ticket --- .../tests/sqllogictests/test_files/window.slt | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt index 27ad459507b0..16f5eccc9c88 100644 --- a/datafusion/core/tests/sqllogictests/test_files/window.slt +++ b/datafusion/core/tests/sqllogictests/test_files/window.slt @@ -410,7 +410,7 @@ aa 3 2 bb 7 2 -# async fn window_in_expression() -> Result<()> { +# async fn window_in_expression query I select 1 - lag(amount, 1) over (order by idx) as column1 from (values ('a', 1, 100), ('a', 2, 150)) as t (col1, idx, amount) --- @@ -419,7 +419,7 @@ NULL -99 -# async fn window_with_agg_in_expression() -> Result<()> { +# async fn window_with_agg_in_expression query TIIIII select col1, idx, count(*), sum(amount), lag(sum(amount), 1) over (order by idx) as prev_amount, sum(amount) - lag(sum(amount), 1) over (order by idx) as difference from ( @@ -431,7 +431,7 @@ a 1 1 100 NULL NULL a 2 1 150 100 50 -# async fn window_frame_empty() -> Result<()> { +# async fn window_frame_empty query II SELECT SUM(c3) OVER() as sum1, @@ -446,7 +446,7 @@ LIMIT 5 781 100 781 100 -# async fn window_frame_rows_preceding() -> Result<()> { +# async fn window_frame_rows_preceding query IRI SELECT SUM(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), @@ -463,7 +463,7 @@ LIMIT 5 46756 15585.333333333334 3 -# async fn window_frame_rows_preceding_stddev_variance() -> Result<()> { +# async fn window_frame_rows_preceding_stddev_variance query RRRR SELECT VAR(c4) OVER(ORDER BY c4 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), @@ -480,7 +480,7 @@ LIMIT 5 768422.9999999981 512281.9999999988 876.597399037893 715.738779164577 66526.3333333288 44350.88888888587 257.926992254259 210.596507304575 -# async fn window_frame_rows_preceding_with_partition_unique_order_by() -> Result<()> { +# async fn window_frame_rows_preceding_with_partition_unique_order_by query IRI SELECT SUM(c4) OVER(PARTITION BY c1 ORDER BY c9 ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING), @@ -501,7 +501,7 @@ LIMIT 5 # /// Therefore, we are commenting out the following test for now. #// #[tokio::test] -#// async fn window_frame_rows_preceding_with_non_unique_partition() -> Result<()> { +#// async fn window_frame_rows_preceding_with_non_unique_partition #// let ctx = SessionContext::new(); #// register_aggregate_csv(&ctx).await?; #// let sql = "SELECT @@ -526,8 +526,9 @@ LIMIT 5 #// Ok(()) #// } -# async fn window_frame_ranges_preceding_following_desc() -> Result<()> { -query error DataFusion error: Internal error: Operator + is not implemented for types Int8(5) and Utf8("1"). This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker +# async fn window_frame_ranges_preceding_following_desc +# This query should pass. Tracked in https://github.com/apache/arrow-datafusion/issues/5346 +query error DataFusion error: Internal error: Operator + is not implemented SELECT SUM(c4) OVER(ORDER BY c2 DESC RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING), SUM(c3) OVER(ORDER BY c2 DESC RANGE BETWEEN 10000 PRECEDING AND 10000 FOLLOWING), @@ -536,7 +537,7 @@ FROM aggregate_test_100 ORDER BY c9 LIMIT 5 -# async fn window_frame_order_by_asc_desc_large() -> Result<()> { +# async fn window_frame_order_by_asc_desc_large query I SELECT SUM(c5) OVER (ORDER BY c2 ASC, c6 DESC) as sum1 @@ -550,7 +551,7 @@ SELECT -4246910946 -# async fn window_frame_order_by_desc_large() -> Result<()> { +# async fn window_frame_order_by_desc_large query I SELECT SUM(c5) OVER (ORDER BY c2 DESC, c6 ASC) as sum1 @@ -564,7 +565,7 @@ SELECT 15810962683 18035025006 -# async fn window_frame_order_by_null_timestamp_order_by() -> Result<()> { +# async fn window_frame_order_by_null_timestamp_order_by query I SELECT SUM(c1) OVER (ORDER BY c2 DESC) as summation1 @@ -577,7 +578,7 @@ SELECT 962 962 -# async fn window_frame_order_by_null_desc() -> Result<()> { +# async fn window_frame_order_by_null_desc query I SELECT COUNT(c2) OVER (ORDER BY c1 DESC RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) @@ -590,7 +591,7 @@ SELECT 9 9 -# async fn window_frame_order_by_null_asc() -> Result<()> { +# async fn window_frame_order_by_null_asc query I SELECT COUNT(c2) OVER (ORDER BY c1 RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) @@ -604,7 +605,7 @@ SELECT 2 5 -# async fn window_frame_order_by_null_asc_null_first() -> Result<()> { +# async fn window_frame_order_by_null_asc_null_first query I SELECT COUNT(c2) OVER (ORDER BY c1 NULLS FIRST RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) @@ -617,7 +618,7 @@ SELECT 9 9 -# async fn window_frame_order_by_null_desc_null_last() -> Result<()> { +# async fn window_frame_order_by_null_desc_null_last query I SELECT COUNT(c2) OVER (ORDER BY c1 DESC NULLS LAST RANGE BETWEEN 5 PRECEDING AND 3 FOLLOWING) @@ -630,7 +631,7 @@ SELECT 6 6 -# async fn window_frame_rows_order_by_null() -> Result<()> { +# async fn window_frame_rows_order_by_null query IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII SELECT SUM(c1) OVER (ORDER BY c3 RANGE BETWEEN 10 PRECEDING AND 11 FOLLOWING) as a, From 704fd58833394fe87296735fe1ec39848b09a4b0 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 20 Feb 2023 05:40:20 -0700 Subject: [PATCH 3/3] escape --- datafusion/core/tests/sqllogictests/test_files/window.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/tests/sqllogictests/test_files/window.slt b/datafusion/core/tests/sqllogictests/test_files/window.slt index 16f5eccc9c88..cbbc82c91653 100644 --- a/datafusion/core/tests/sqllogictests/test_files/window.slt +++ b/datafusion/core/tests/sqllogictests/test_files/window.slt @@ -528,7 +528,7 @@ LIMIT 5 # async fn window_frame_ranges_preceding_following_desc # This query should pass. Tracked in https://github.com/apache/arrow-datafusion/issues/5346 -query error DataFusion error: Internal error: Operator + is not implemented +query error DataFusion error: Internal error: Operator \+ is not implemented SELECT SUM(c4) OVER(ORDER BY c2 DESC RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING), SUM(c3) OVER(ORDER BY c2 DESC RANGE BETWEEN 10000 PRECEDING AND 10000 FOLLOWING),