Skip to content

Commit

Permalink
Made NTH_VALUE(IGNORE NULLS) optional on Spark version.
Browse files Browse the repository at this point in the history
  • Loading branch information
mythrocks committed Oct 26, 2023
1 parent 11ec8db commit 5b8158a
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,11 +497,14 @@ def test_window_running_no_part(b_gen, batch_size):
'max(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as max_col',
'FIRST(b) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls',
'FIRST(b, TRUE) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls',
'NTH_VALUE(b, 1) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls',
'NTH_VALUE(b, 1) IGNORE NULLS OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls']
'NTH_VALUE(b, 1) OVER (ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls']
if isinstance(b_gen.data_type, NumericType) and not isinstance(b_gen, FloatGen) and not isinstance(b_gen, DoubleGen):
query_parts.append('sum(b) over (order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as sum_col')

if spark_version() > "3.1.1":
query_parts.append('NTH_VALUE(b, 1) IGNORE NULLS OVER '
'(ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls')

assert_gpu_and_cpu_are_equal_sql(
lambda spark : two_col_df(spark, UniqueLongGen(), b_gen, length=1024 * 14),
"window_agg_table",
Expand Down Expand Up @@ -611,13 +614,17 @@ def test_window_running(b_gen, c_gen, batch_size):
'max(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as max_col',
'FIRST(c) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_keep_nulls',
'FIRST(c, TRUE) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS first_ignore_nulls',
'NTH_VALUE(c, 1) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls',
'NTH_VALUE(c, 1) IGNORE NULLS OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls']
'NTH_VALUE(c, 1) OVER (PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_keep_nulls']

# Decimal precision can grow too large. Float and Double can get odd results for Inf/-Inf because of ordering
if isinstance(c_gen.data_type, NumericType) and (not isinstance(c_gen, FloatGen)) and (not isinstance(c_gen, DoubleGen)) and (not isinstance(c_gen, DecimalGen)):
query_parts.append('sum(c) over (partition by b order by a rows between UNBOUNDED PRECEDING AND CURRENT ROW) as sum_col')

# The option to IGNORE NULLS in NTH_VALUE is not available prior to Spark 3.1.2.
if spark_version() > "3.1.1":
query_parts.append('NTH_VALUE(c, 1) IGNORE NULLS OVER '
'(PARTITION BY b ORDER BY a ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS nth_1_ignore_nulls')

assert_gpu_and_cpu_are_equal_sql(
lambda spark : three_col_df(spark, UniqueLongGen(), RepeatSeqGen(b_gen, length=100), c_gen, length=1024 * 14),
"window_agg_table",
Expand Down

0 comments on commit 5b8158a

Please sign in to comment.