From 3db464b07a3074a4afc6033272892ee3b9a600ba Mon Sep 17 00:00:00 2001 From: MithunR Date: Mon, 17 Jun 2024 15:53:00 -0700 Subject: [PATCH] Disable ANSI mode for window function tests. Fixes #11019. Window function tests fail on Spark 4.0 because of #5114 (and #5120 broadly), because spark-rapids does not support SUM, COUNT, and certain other aggregations in ANSI mode. This commit disables ANSI mode tests for the failing window function tests. These may be revisited, once error/overflow checking is available for ANSI mode in spark-rapids. Signed-off-by: MithunR --- .../src/main/python/window_function_test.py | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index af8bbbb55b3..89499eae09b 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -165,6 +165,8 @@ def test_float_window_min_max_all_nans(data_gen): .withColumn("max_b", f.max('a').over(w)) ) + +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 @ignore_order @pytest.mark.parametrize('data_gen', [decimal_gen_128bit], ids=idfn) def test_decimal128_count_window(data_gen): @@ -177,6 +179,8 @@ def test_decimal128_count_window(data_gen): ' rows between 2 preceding and 10 following) as count_c_asc ' 'from window_agg_table') + +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 @ignore_order @pytest.mark.parametrize('data_gen', [decimal_gen_128bit], ids=idfn) def test_decimal128_count_window_no_part(data_gen): @@ -189,6 +193,8 @@ def test_decimal128_count_window_no_part(data_gen): ' rows between 2 preceding and 10 following) as count_b_asc ' 'from window_agg_table') + +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 @ignore_order @pytest.mark.parametrize('data_gen', decimal_gens, ids=idfn) def test_decimal_sum_window(data_gen): @@ -201,6 +207,8 @@ def test_decimal_sum_window(data_gen): ' rows between 2 preceding and 10 following) as sum_c_asc ' 'from window_agg_table') + +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 @ignore_order @pytest.mark.parametrize('data_gen', decimal_gens, ids=idfn) def test_decimal_sum_window_no_part(data_gen): @@ -214,6 +222,7 @@ def test_decimal_sum_window_no_part(data_gen): 'from window_agg_table') +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 @ignore_order @pytest.mark.parametrize('data_gen', decimal_gens, ids=idfn) def test_decimal_running_sum_window(data_gen): @@ -227,6 +236,8 @@ def test_decimal_running_sum_window(data_gen): 'from window_agg_table', conf = {'spark.rapids.sql.batchSizeBytes': '100'}) + +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 @ignore_order @pytest.mark.parametrize('data_gen', decimal_gens, ids=idfn) def test_decimal_running_sum_window_no_part(data_gen): @@ -302,6 +313,7 @@ def test_window_aggs_for_ranges_numeric_long_overflow(data_gen): 'from window_agg_table') +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 # In a distributed setup the order of the partitions returned might be different, so we must ignore the order # but small batch sizes can make sort very slow, so do the final order by locally @ignore_order(local=True) @@ -352,6 +364,7 @@ def test_window_aggs_for_range_numeric_date(data_gen, batch_size): conf = conf) +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 # In a distributed setup the order of the partitions returned might be different, so we must ignore the order # but small batch sizes can make sort very slow, so do the final order by locally @ignore_order(local=True) @@ -396,6 +409,7 @@ def test_window_aggs_for_rows(data_gen, batch_size): conf = conf) +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 @ignore_order(local=True) @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) @pytest.mark.parametrize('data_gen', [ @@ -482,6 +496,8 @@ def test_window_batched_unbounded(b_gen, batch_size): validate_execs_in_gpu_plan = ['GpuCachedDoublePassWindowExec'], conf = conf) + +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 # This is for aggregations that work with a running window optimization. They don't need to be batched # specially, but it only works if all of the aggregations can support this. # the order returned should be consistent because the data ends up in a single task (no partitioning) @@ -520,6 +536,7 @@ def test_rows_based_running_window_unpartitioned(b_gen, batch_size): conf = conf) +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # Testing multiple batch sizes. @pytest.mark.parametrize('a_gen', integral_gens + [string_gen, date_gen, timestamp_gen], ids=meta_idfn('data:')) @allow_non_gpu(*non_utc_allow) @@ -694,6 +711,7 @@ def test_window_running_rank(data_gen): conf = conf) +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 # This is for aggregations that work with a running window optimization. They don't need to be batched # specially, but it only works if all of the aggregations can support this. # In a distributed setup the order of the partitions returned might be different, so we must ignore the order @@ -738,6 +756,8 @@ def test_rows_based_running_window_partitioned(b_gen, c_gen, batch_size): conf = conf) + +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 @ignore_order(local=True) @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # Test different batch sizes. @pytest.mark.parametrize('part_gen', [int_gen, long_gen], ids=idfn) # Partitioning is not really the focus of the test. @@ -805,6 +825,7 @@ def must_test_sum_aggregation(gen): conf=conf) +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 # Test that we can do a running window sum on floats and doubles and decimal. This becomes problematic because we do the agg in parallel # which means that the result can switch back and forth from Inf to not Inf depending on the order of aggregations. # We test this by limiting the range of the values in the sum to never hit Inf, and by using abs so we don't have @@ -836,6 +857,7 @@ def test_window_running_float_decimal_sum(batch_size): conf = conf) +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 @approximate_float @ignore_order(local=True) @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # Test different batch sizes. @@ -879,6 +901,7 @@ def window(oby_column): conf=conf) +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 # In a distributed setup the order of the partitions returned might be different, so we must ignore the order # but small batch sizes can make sort very slow, so do the final order by locally @ignore_order(local=True) @@ -1000,6 +1023,7 @@ def test_window_aggs_for_rows_lead_lag_on_arrays(a_gen, b_gen, c_gen, d_gen): ''') +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 # lead and lag don't currently work for string columns, so redo the tests, but just for strings # without lead and lag # In a distributed setup the order of the partitions returned might be different, so we must ignore the order @@ -1107,6 +1131,8 @@ def test_window_aggs_lag_ignore_nulls_fallback(a_gen, b_gen, c_gen, d_gen): FROM window_agg_table ''') + +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 # Test for RANGE queries, with timestamp order-by expressions. # In a distributed setup the order of the partitions returned might be different, so we must ignore the order # but small batch sizes can make sort very slow, so do the final order by locally @@ -1155,6 +1181,7 @@ def test_window_aggs_for_ranges_timestamps(data_gen): conf = {'spark.rapids.sql.castFloatToDecimal.enabled': True}) +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 # In a distributed setup the order of the partitions returned might be different, so we must ignore the order # but small batch sizes can make sort very slow, so do the final order by locally @ignore_order(local=True) @@ -1201,6 +1228,7 @@ def test_window_aggregations_for_decimal_and_float_ranges(data_gen): conf={}) +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 # In a distributed setup the order of the partitions returned might be different, so we must ignore the order # but small batch sizes can make sort very slow, so do the final order by locally @ignore_order(local=True) @@ -1306,6 +1334,7 @@ def test_window_aggs_for_rows_collect_list(): conf={'spark.rapids.sql.window.collectList.enabled': True}) +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 # SortExec does not support array type, so sort the result locally. @ignore_order(local=True) # This test is more directed at Databricks and their running window optimization instead of ours @@ -1347,6 +1376,8 @@ def test_running_window_function_exec_for_all_aggs(): ''', conf={'spark.rapids.sql.window.collectList.enabled': True}) + +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 # Test the Databricks WindowExec which combines a WindowExec with a ProjectExec and provides the output # fields that we need to handle with an extra GpuProjectExec and we need the input expressions to compute # a window function of another window function case @@ -1668,6 +1699,8 @@ def do_it(spark): assert_gpu_fallback_collect(do_it, 'WindowExec') + +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 @ignore_order(local=True) # single-level structs (no nested structs) are now supported by the plugin @pytest.mark.parametrize('part_gen', [StructGen([["a", long_gen]])], ids=meta_idfn('partBy:')) @@ -1731,6 +1764,8 @@ def do_it(spark): assert_gpu_and_cpu_are_equal_collect(do_it) + +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 @ignore_order def test_unbounded_to_unbounded_window(): # This is specifically to test a bug that caused overflow issues when calculating @@ -1784,6 +1819,7 @@ def test_window_first_last_nth_ignore_nulls(data_gen): 'FROM window_agg_table') +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 @tz_sensitive_test @allow_non_gpu(*non_supported_tz_allow) @ignore_order(local=True) @@ -1825,6 +1861,7 @@ def test_to_date_with_window_functions(): ) +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 @ignore_order(local=True) @approximate_float @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) @@ -1881,6 +1918,7 @@ def spark_bugs_in_decimal_sorting(): return v < "3.1.4" or v < "3.3.1" or v < "3.2.3" or v < "3.4.0" +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 @ignore_order(local=True) @approximate_float @pytest.mark.parametrize('batch_size', ['1g'], ids=idfn) @@ -1925,6 +1963,7 @@ def test_window_aggs_for_negative_rows_unpartitioned(data_gen, batch_size): conf=conf) +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 @ignore_order(local=True) @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) @pytest.mark.parametrize('data_gen', [ @@ -1964,6 +2003,7 @@ def test_window_aggs_for_batched_finite_row_windows_partitioned(data_gen, batch_ conf=conf) +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 @ignore_order(local=True) @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) @pytest.mark.parametrize('data_gen', [ @@ -2003,6 +2043,7 @@ def test_window_aggs_for_batched_finite_row_windows_unpartitioned(data_gen, batc conf=conf) +@ansi_mode_disabled # https://github.com/NVIDIA/spark-rapids/issues/5114 @ignore_order(local=True) @pytest.mark.parametrize('data_gen', [_grpkey_int_with_nulls,], ids=idfn) def test_window_aggs_for_batched_finite_row_windows_fallback(data_gen):