From d896b565e04956fc3b73d8122b181fec5e426e8d Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Mon, 17 Jul 2023 17:32:38 -0700 Subject: [PATCH 1/9] added a test for schema pruning for orc --- integration_tests/src/main/python/orc_test.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/integration_tests/src/main/python/orc_test.py b/integration_tests/src/main/python/orc_test.py index 7262b279b60..b4afcd38ede 100644 --- a/integration_tests/src/main/python/orc_test.py +++ b/integration_tests/src/main/python/orc_test.py @@ -880,3 +880,11 @@ def test_orc_column_name_with_dots(spark_tmp_path, reader_confs): assert_gpu_and_cpu_are_equal_collect(lambda spark: reader(spark).selectExpr("`a.b`"), conf=all_confs) assert_gpu_and_cpu_are_equal_collect(lambda spark: reader(spark).selectExpr("`a.b`.`c.d.e`.`f.g`"), conf=all_confs) + +# https://github.com/NVIDIA/spark-rapids/issues/8712 +def test_select_single_complex_field_array(spark_tmp_path): + data_path = spark_tmp_path + "/ORC_DATA" + data_gen = ArrayGen(StructGen([('first', StringGen()),('last', StringGen())]), max_length=10, nullable=False) + with_cpu_session(lambda spark: gen_df(spark, data_gen).write.orc(data_path)) + + assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.read.orc(data_path).selectExpr("value.last", "value")) \ No newline at end of file From 939bbc99e09538d2f13a9134bdaad56126e3413b Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Mon, 17 Jul 2023 17:35:29 -0700 Subject: [PATCH 2/9] added a test for schema pruning for parquet --- integration_tests/src/main/python/parquet_test.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index b6261b2295c..8db972deb48 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -1527,3 +1527,11 @@ def test_parquet_column_name_with_dots(spark_tmp_path, reader_confs): assert_gpu_and_cpu_are_equal_collect(lambda spark: reader(spark).selectExpr("`a.b`"), conf=all_confs) assert_gpu_and_cpu_are_equal_collect(lambda spark: reader(spark).selectExpr("`a.b`.`c.d.e`.`f.g`"), conf=all_confs) + +# https://github.com/NVIDIA/spark-rapids/issues/8712 +def test_select_single_complex_field_array(spark_tmp_path): + data_path = spark_tmp_path + "/PARQUET_DATA" + data_gen = ArrayGen(StructGen([('first', StringGen()),('last', StringGen())]), max_length=10, nullable=False) + with_cpu_session(lambda spark: gen_df(spark, data_gen).write.parquet(data_path)) + + assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.read.parquet(data_path).selectExpr("value.last", "value")) \ No newline at end of file From faf6a52ac46275601fe7836b5ee340ed80f985e7 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 18 Jul 2023 12:00:40 -0700 Subject: [PATCH 3/9] signing off Signed-off-by: Raza Jafri From 8bd952365e6db5826692e51d17581a10b39be48a Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Fri, 21 Jul 2023 17:40:21 -0700 Subject: [PATCH 4/9] Add schema pruning tests --- .../src/main/python/parquet_test.py | 8 -- .../python/prune_partition_column_test.py | 78 ++++++++++++++++++- .../rapids/ExecutionPlanCaptureCallback.scala | 32 +++++++- 3 files changed, 106 insertions(+), 12 deletions(-) diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 8db972deb48..b6261b2295c 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -1527,11 +1527,3 @@ def test_parquet_column_name_with_dots(spark_tmp_path, reader_confs): assert_gpu_and_cpu_are_equal_collect(lambda spark: reader(spark).selectExpr("`a.b`"), conf=all_confs) assert_gpu_and_cpu_are_equal_collect(lambda spark: reader(spark).selectExpr("`a.b`.`c.d.e`.`f.g`"), conf=all_confs) - -# https://github.com/NVIDIA/spark-rapids/issues/8712 -def test_select_single_complex_field_array(spark_tmp_path): - data_path = spark_tmp_path + "/PARQUET_DATA" - data_gen = ArrayGen(StructGen([('first', StringGen()),('last', StringGen())]), max_length=10, nullable=False) - with_cpu_session(lambda spark: gen_df(spark, data_gen).write.parquet(data_path)) - - assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.read.parquet(data_path).selectExpr("value.last", "value")) \ No newline at end of file diff --git a/integration_tests/src/main/python/prune_partition_column_test.py b/integration_tests/src/main/python/prune_partition_column_test.py index f68c32f4d4b..3ad66f1a7c7 100644 --- a/integration_tests/src/main/python/prune_partition_column_test.py +++ b/integration_tests/src/main/python/prune_partition_column_test.py @@ -15,10 +15,11 @@ import os import pytest -from asserts import assert_gpu_and_cpu_are_equal_collect +from asserts import assert_gpu_and_cpu_are_equal_collect, run_with_cpu_and_gpu, assert_equal from data_gen import * from marks import * -from spark_session import with_cpu_session +from spark_session import with_cpu_session, is_before_spark_320 +from conftest import spark_jvm # Several values to avoid generating too many folders for partitions. part1_gen = SetValuesGen(IntegerType(), [-10, -1, 0, 1, 10]) @@ -127,3 +128,76 @@ def test_prune_partition_column_when_filter_fallback_project(spark_tmp_path, pru filter_col, file_format): do_prune_partition_column_when_filter_project(spark_tmp_path, prune_part_enabled, file_format, filter_col, gpu_project_enabled=False) + +def create_contacts_table_and_read(is_partitioned, format, data_path, expected_schemata, func, conf): + full_name_type = StructGen([('first', StringGen()), ('middle', StringGen()), ('last', StringGen())]) + name_type = StructGen([('first', StringGen()), ('last', StringGen())]) + contacts_data_gen = StructGen([ + ('id', IntegerGen()), + ('full_name', full_name_type), + ('address', StringGen()), + ('friends', ArrayGen(full_name_type, max_length=10, nullable=False))], nullable=False) + + brief_contacts_data_gen = StructGen([ + ('id', IntegerGen()), + ('name', name_type), + ('address', StringGen())], nullable=False) + + def contact_gen_df(spark, data_gen, partition): + gen = gen_df(spark, data_gen) + if is_partitioned: + return gen.withColumn('p', f.lit(partition)) + else: + return gen + + with_cpu_session(lambda spark: contact_gen_df(spark, contacts_data_gen, 1).write.format(format).save(data_path + "/contacts/p=1")) + with_cpu_session(lambda spark: contact_gen_df(spark, brief_contacts_data_gen, 2).write.format(format).save(data_path + "/contacts/p=2")) + + (from_cpu, cpu_df), (from_gpu, gpu_df) = run_with_cpu_and_gpu( + func, + 'COLLECT_WITH_DATAFRAME', + conf=conf) + + jvm = spark_jvm() + jvm.org.apache.spark.sql.rapids.ExecutionPlanCaptureCallback.assertSchemataMatch(cpu_df._jdf, gpu_df._jdf, expected_schemata) + assert_equal(from_cpu, from_gpu) + +# https://github.com/NVIDIA/spark-rapids/issues/8712 +# https://github.com/NVIDIA/spark-rapids/issues/8714 +@pytest.mark.parametrize('query_and_expected_schemata', [("select friends.middle, friends from contacts where p=1", "struct>>"), + ("select name.first from contacts where name.first = 'Jane'", "struct>")]) +@pytest.mark.parametrize('vectorized', ["true", "false"]) +@pytest.mark.parametrize('is_partitioned', [True, False]) +@pytest.mark.parametrize('format', ["parquet", "orc"]) +def test_select_complex_field(format, spark_tmp_path, query_and_expected_schemata, vectorized, is_partitioned): + query, expected_schemata = query_and_expected_schemata + data_path = spark_tmp_path + "/DATA" + def read_temp_view(spark): + schema = "`id` INT, `name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " + \ + "`address` STRING,`friends` ARRAY>,`p` INT" + spark.read.format(format).schema(schema).load(data_path + "/contacts").createOrReplaceTempView("contacts") + return spark.sql(query) + conf={"spark.sql.parquet.enableVectorizedReader": vectorized} + create_contacts_table_and_read(is_partitioned, format, data_path, expected_schemata, read_temp_view, conf) + +# https://github.com/NVIDIA/spark-rapids/issues/8715 +@pytest.mark.parametrize('select_and_expected_schemata', [("friend.First", "struct>>"), + ("friend.MIDDLE", "struct>>")]) +@pytest.mark.parametrize('vectorized', ["true", "false"]) +@pytest.mark.skipif(is_before_spark_320(), reason='https://issues.apache.org/jira/browse/SPARK-34638') +@pytest.mark.parametrize('is_partitioned', [True, False]) +@pytest.mark.parametrize('format', ["parquet", "orc"]) +def test_nested_column_prune_on_generator_output(format, spark_tmp_path, select_and_expected_schemata, vectorized, is_partitioned): + query, expected_schemata = select_and_expected_schemata + data_path = spark_tmp_path + "/DATA" + def read_temp_view(spark): + schema = "`id` INT, `name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " + \ + "`address` STRING,`friends` ARRAY>,`p` INT" + spark.read.format(format).schema(schema).load(data_path + "/contacts").createOrReplaceTempView("contacts") + return spark.table("contacts").select(f.explode(f.col("friends")).alias("friend")).select(query) + + conf = {"spark.sql.caseSensitive": "false", + "spark.sql.parquet.enableVectorizedReader": vectorized} + create_contacts_table_and_read(is_partitioned, format, data_path, expected_schemata, read_temp_view, conf) \ No newline at end of file diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/ExecutionPlanCaptureCallback.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/ExecutionPlanCaptureCallback.scala index fa85f7612af..14ecce5dc18 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/ExecutionPlanCaptureCallback.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/ExecutionPlanCaptureCallback.scala @@ -25,11 +25,11 @@ import com.nvidia.spark.rapids.{PlanShims, PlanUtils} import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.{ExecSubqueryExpression, QueryExecution, ReusedSubqueryExec, SparkPlan} -import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} +import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper, QueryStageExec} import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.util.QueryExecutionListener -object ExecutionPlanCaptureCallback { +object ExecutionPlanCaptureCallback extends AdaptiveSparkPlanHelper { private[this] var shouldCapture: Boolean = false private[this] val execPlans: ArrayBuffer[SparkPlan] = ArrayBuffer.empty @@ -83,6 +83,34 @@ object ExecutionPlanCaptureCallback { fallbackCpuClassList.foreach(fallbackCpuClass => assertDidFallBack(gpuPlans, fallbackCpuClass)) } + def assertSchemataMatch(cpuDf: DataFrame, gpuDf: DataFrame, expectedSchema: String): Unit = { + import org.apache.spark.sql.execution.FileSourceScanExec + import org.apache.spark.sql.types.StructType + import org.apache.spark.sql.catalyst.parser.CatalystSqlParser + + val cpuFileSourceScanSchemata = collect(cpuDf.queryExecution.executedPlan) { + case scan: FileSourceScanExec => scan.requiredSchema + } + val gpuFileSourceScanSchemata = collect(gpuDf.queryExecution.executedPlan) { + case scan: GpuFileSourceScanExec => scan.requiredSchema + } + assert(cpuFileSourceScanSchemata.size == gpuFileSourceScanSchemata.size, + s"Found ${cpuFileSourceScanSchemata.size} file sources in dataframe, " + + s"but expected ${gpuFileSourceScanSchemata.size}") + + cpuFileSourceScanSchemata.zip(gpuFileSourceScanSchemata).foreach { + case (cpuScanSchema, gpuScanSchema) => + cpuScanSchema match { + case otherType: StructType => + assert(gpuScanSchema.sameType(otherType)) + val expectedStructType = CatalystSqlParser.parseDataType(expectedSchema) + assert(gpuScanSchema.sameType(expectedStructType)) + assert(cpuScanSchema.sameType(expectedStructType)) + case _ => assert(false) + } + } + } + def assertCapturedAndGpuFellBack(fallbackCpuClass: String, timeoutMs: Long = 2000): Unit = { val gpuPlans = getResultsWithTimeout(timeoutMs = timeoutMs) assert(gpuPlans.nonEmpty, "Did not capture a plan") From 302f20fc6bfc2adb0d658f3e7016b1ed3830b458 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Mon, 24 Jul 2023 18:22:35 -0700 Subject: [PATCH 5/9] add test for missing complex field --- .../src/main/python/prune_partition_column_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/prune_partition_column_test.py b/integration_tests/src/main/python/prune_partition_column_test.py index 3ad66f1a7c7..341f03c0c97 100644 --- a/integration_tests/src/main/python/prune_partition_column_test.py +++ b/integration_tests/src/main/python/prune_partition_column_test.py @@ -134,7 +134,7 @@ def create_contacts_table_and_read(is_partitioned, format, data_path, expected_s name_type = StructGen([('first', StringGen()), ('last', StringGen())]) contacts_data_gen = StructGen([ ('id', IntegerGen()), - ('full_name', full_name_type), + ('name', full_name_type), ('address', StringGen()), ('friends', ArrayGen(full_name_type, max_length=10, nullable=False))], nullable=False) @@ -163,8 +163,10 @@ def contact_gen_df(spark, data_gen, partition): assert_equal(from_cpu, from_gpu) # https://github.com/NVIDIA/spark-rapids/issues/8712 +# https://github.com/NVIDIA/spark-rapids/issues/8713 # https://github.com/NVIDIA/spark-rapids/issues/8714 @pytest.mark.parametrize('query_and_expected_schemata', [("select friends.middle, friends from contacts where p=1", "struct>>"), + pytest.param(("select name.middle, address from contacts where p=2", "struct,address:string>"), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/8788')), ("select name.first from contacts where name.first = 'Jane'", "struct>")]) @pytest.mark.parametrize('vectorized', ["true", "false"]) @pytest.mark.parametrize('is_partitioned', [True, False]) From 5e311669d76f286bdcde4164da92f304b76313be Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 25 Jul 2023 18:02:55 -0700 Subject: [PATCH 6/9] addressed review comments --- .../python/prune_partition_column_test.py | 41 ++++++++++--------- .../rapids/ExecutionPlanCaptureCallback.scala | 14 +++++-- 2 files changed, 32 insertions(+), 23 deletions(-) diff --git a/integration_tests/src/main/python/prune_partition_column_test.py b/integration_tests/src/main/python/prune_partition_column_test.py index 341f03c0c97..0142f529ce0 100644 --- a/integration_tests/src/main/python/prune_partition_column_test.py +++ b/integration_tests/src/main/python/prune_partition_column_test.py @@ -18,6 +18,7 @@ from asserts import assert_gpu_and_cpu_are_equal_collect, run_with_cpu_and_gpu, assert_equal from data_gen import * from marks import * +from pyspark.sql.types import IntegerType from spark_session import with_cpu_session, is_before_spark_320 from conftest import spark_jvm @@ -143,6 +144,8 @@ def create_contacts_table_and_read(is_partitioned, format, data_path, expected_s ('name', name_type), ('address', StringGen())], nullable=False) + # We are adding the field 'p' twice just like it is being done in Spark tests + # https://github.com/apache/spark/blob/85e252e8503534009f4fb5ea005d44c9eda31447/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala#L193 def contact_gen_df(spark, data_gen, partition): gen = gen_df(spark, data_gen) if is_partitioned: @@ -153,8 +156,11 @@ def contact_gen_df(spark, data_gen, partition): with_cpu_session(lambda spark: contact_gen_df(spark, contacts_data_gen, 1).write.format(format).save(data_path + "/contacts/p=1")) with_cpu_session(lambda spark: contact_gen_df(spark, brief_contacts_data_gen, 2).write.format(format).save(data_path + "/contacts/p=2")) + # Schema to read in. + read_schema = contacts_data_gen.data_type.add("p", IntegerType(), True) if is_partitioned else contacts_data_gen.data_type + (from_cpu, cpu_df), (from_gpu, gpu_df) = run_with_cpu_and_gpu( - func, + func(read_schema), 'COLLECT_WITH_DATAFRAME', conf=conf) @@ -168,38 +174,33 @@ def contact_gen_df(spark, data_gen, partition): @pytest.mark.parametrize('query_and_expected_schemata', [("select friends.middle, friends from contacts where p=1", "struct>>"), pytest.param(("select name.middle, address from contacts where p=2", "struct,address:string>"), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/8788')), ("select name.first from contacts where name.first = 'Jane'", "struct>")]) -@pytest.mark.parametrize('vectorized', ["true", "false"]) @pytest.mark.parametrize('is_partitioned', [True, False]) @pytest.mark.parametrize('format', ["parquet", "orc"]) -def test_select_complex_field(format, spark_tmp_path, query_and_expected_schemata, vectorized, is_partitioned): +def test_select_complex_field(format, spark_tmp_path, query_and_expected_schemata, is_partitioned): query, expected_schemata = query_and_expected_schemata data_path = spark_tmp_path + "/DATA" - def read_temp_view(spark): - schema = "`id` INT, `name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " + \ - "`address` STRING,`friends` ARRAY>,`p` INT" - spark.read.format(format).schema(schema).load(data_path + "/contacts").createOrReplaceTempView("contacts") - return spark.sql(query) - conf={"spark.sql.parquet.enableVectorizedReader": vectorized} + def read_temp_view(schema): + def do_it(spark): + spark.read.format(format).schema(schema).load(data_path + "/contacts").createOrReplaceTempView("contacts") + return spark.sql(query) + return do_it + conf={"spark.sql.parquet.enableVectorizedReader": "true"} create_contacts_table_and_read(is_partitioned, format, data_path, expected_schemata, read_temp_view, conf) # https://github.com/NVIDIA/spark-rapids/issues/8715 @pytest.mark.parametrize('select_and_expected_schemata', [("friend.First", "struct>>"), ("friend.MIDDLE", "struct>>")]) -@pytest.mark.parametrize('vectorized', ["true", "false"]) @pytest.mark.skipif(is_before_spark_320(), reason='https://issues.apache.org/jira/browse/SPARK-34638') @pytest.mark.parametrize('is_partitioned', [True, False]) @pytest.mark.parametrize('format', ["parquet", "orc"]) -def test_nested_column_prune_on_generator_output(format, spark_tmp_path, select_and_expected_schemata, vectorized, is_partitioned): +def test_nested_column_prune_on_generator_output(format, spark_tmp_path, select_and_expected_schemata, is_partitioned): query, expected_schemata = select_and_expected_schemata data_path = spark_tmp_path + "/DATA" - def read_temp_view(spark): - schema = "`id` INT, `name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " + \ - "`address` STRING,`friends` ARRAY>,`p` INT" - spark.read.format(format).schema(schema).load(data_path + "/contacts").createOrReplaceTempView("contacts") - return spark.table("contacts").select(f.explode(f.col("friends")).alias("friend")).select(query) - + def read_temp_view(schema): + def do_it(spark): + spark.read.format(format).schema(schema).load(data_path + "/contacts").createOrReplaceTempView("contacts") + return spark.table("contacts").select(f.explode(f.col("friends")).alias("friend")).select(query) + return do_it conf = {"spark.sql.caseSensitive": "false", - "spark.sql.parquet.enableVectorizedReader": vectorized} + "spark.sql.parquet.enableVectorizedReader": "true"} create_contacts_table_and_read(is_partitioned, format, data_path, expected_schemata, read_temp_view, conf) \ No newline at end of file diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/ExecutionPlanCaptureCallback.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/ExecutionPlanCaptureCallback.scala index 14ecce5dc18..4b6813d7469 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/ExecutionPlanCaptureCallback.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/ExecutionPlanCaptureCallback.scala @@ -83,6 +83,11 @@ object ExecutionPlanCaptureCallback extends AdaptiveSparkPlanHelper { fallbackCpuClassList.foreach(fallbackCpuClass => assertDidFallBack(gpuPlans, fallbackCpuClass)) } + /** + * This method is used by the Python integration tests. + * The method checks the schemata used in the GPU and CPU executed plans and compares it to the + * expected schemata to make sure we are not reading more data than needed + */ def assertSchemataMatch(cpuDf: DataFrame, gpuDf: DataFrame, expectedSchema: String): Unit = { import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.types.StructType @@ -104,9 +109,12 @@ object ExecutionPlanCaptureCallback extends AdaptiveSparkPlanHelper { case otherType: StructType => assert(gpuScanSchema.sameType(otherType)) val expectedStructType = CatalystSqlParser.parseDataType(expectedSchema) - assert(gpuScanSchema.sameType(expectedStructType)) - assert(cpuScanSchema.sameType(expectedStructType)) - case _ => assert(false) + assert(gpuScanSchema.sameType(expectedStructType), + s"Type GPU schema ${gpuScanSchema.toDDL} doesn't match $expectedSchema") + assert(cpuScanSchema.sameType(expectedStructType), + s"Type CPU schema ${cpuScanSchema.toDDL} doesn't match $expectedSchema") + case otherType => assert(false, s"The expected type $cpuScanSchema" + + s" doesn't match the actual type $otherType") } } } From ec75ae637d7ae393feff7fc9fd03ae1b2fb655ad Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 25 Jul 2023 18:50:18 -0700 Subject: [PATCH 7/9] added comments --- .../src/main/python/prune_partition_column_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/integration_tests/src/main/python/prune_partition_column_test.py b/integration_tests/src/main/python/prune_partition_column_test.py index 0142f529ce0..3abdc4a0a36 100644 --- a/integration_tests/src/main/python/prune_partition_column_test.py +++ b/integration_tests/src/main/python/prune_partition_column_test.py @@ -130,6 +130,8 @@ def test_prune_partition_column_when_filter_fallback_project(spark_tmp_path, pru do_prune_partition_column_when_filter_project(spark_tmp_path, prune_part_enabled, file_format, filter_col, gpu_project_enabled=False) +# This method creates two tables and saves them to partitioned Parquet/ORC files. The file is then +# read in using the read function that is passed in def create_contacts_table_and_read(is_partitioned, format, data_path, expected_schemata, func, conf): full_name_type = StructGen([('first', StringGen()), ('middle', StringGen()), ('last', StringGen())]) name_type = StructGen([('first', StringGen()), ('last', StringGen())]) From ed84eb2b31cf50727a86cc3694253cc235dc009e Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Fri, 4 Aug 2023 13:56:40 -0700 Subject: [PATCH 8/9] skip failing method instead of xfail --- .../python/prune_partition_column_test.py | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/integration_tests/src/main/python/prune_partition_column_test.py b/integration_tests/src/main/python/prune_partition_column_test.py index 3abdc4a0a36..520e5a1f11e 100644 --- a/integration_tests/src/main/python/prune_partition_column_test.py +++ b/integration_tests/src/main/python/prune_partition_column_test.py @@ -132,7 +132,7 @@ def test_prune_partition_column_when_filter_fallback_project(spark_tmp_path, pru # This method creates two tables and saves them to partitioned Parquet/ORC files. The file is then # read in using the read function that is passed in -def create_contacts_table_and_read(is_partitioned, format, data_path, expected_schemata, func, conf): +def create_contacts_table_and_read(is_partitioned, format, data_path, expected_schemata, func, conf, table_name): full_name_type = StructGen([('first', StringGen()), ('middle', StringGen()), ('last', StringGen())]) name_type = StructGen([('first', StringGen()), ('last', StringGen())]) contacts_data_gen = StructGen([ @@ -155,8 +155,8 @@ def contact_gen_df(spark, data_gen, partition): else: return gen - with_cpu_session(lambda spark: contact_gen_df(spark, contacts_data_gen, 1).write.format(format).save(data_path + "/contacts/p=1")) - with_cpu_session(lambda spark: contact_gen_df(spark, brief_contacts_data_gen, 2).write.format(format).save(data_path + "/contacts/p=2")) + with_cpu_session(lambda spark: contact_gen_df(spark, contacts_data_gen, 1).write.format(format).save(data_path + f"/{table_name}/p=1")) + with_cpu_session(lambda spark: contact_gen_df(spark, brief_contacts_data_gen, 2).write.format(format).save(data_path + f"/{table_name}/p=2")) # Schema to read in. read_schema = contacts_data_gen.data_type.add("p", IntegerType(), True) if is_partitioned else contacts_data_gen.data_type @@ -173,21 +173,22 @@ def contact_gen_df(spark, data_gen, partition): # https://github.com/NVIDIA/spark-rapids/issues/8712 # https://github.com/NVIDIA/spark-rapids/issues/8713 # https://github.com/NVIDIA/spark-rapids/issues/8714 -@pytest.mark.parametrize('query_and_expected_schemata', [("select friends.middle, friends from contacts where p=1", "struct>>"), - pytest.param(("select name.middle, address from contacts where p=2", "struct,address:string>"), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/8788')), - ("select name.first from contacts where name.first = 'Jane'", "struct>")]) +@pytest.mark.parametrize('query_and_expected_schemata', [("select friends.middle, friends from {} where p=1", "struct>>"), + pytest.param(("select name.middle, address from {} where p=2", "struct,address:string>"), marks=pytest.mark.skip(reason='https://github.com/NVIDIA/spark-rapids/issues/8788')), + ("select name.first from {} where name.first = 'Jane'", "struct>")]) @pytest.mark.parametrize('is_partitioned', [True, False]) @pytest.mark.parametrize('format', ["parquet", "orc"]) -def test_select_complex_field(format, spark_tmp_path, query_and_expected_schemata, is_partitioned): +def test_select_complex_field(format, spark_tmp_path, query_and_expected_schemata, is_partitioned, spark_tmp_table_factory): + table_name = spark_tmp_table_factory.get() query, expected_schemata = query_and_expected_schemata data_path = spark_tmp_path + "/DATA" def read_temp_view(schema): def do_it(spark): - spark.read.format(format).schema(schema).load(data_path + "/contacts").createOrReplaceTempView("contacts") - return spark.sql(query) + spark.read.format(format).schema(schema).load(data_path + f"/{table_name}").createOrReplaceTempView(table_name) + return spark.sql(query.format(table_name)) return do_it conf={"spark.sql.parquet.enableVectorizedReader": "true"} - create_contacts_table_and_read(is_partitioned, format, data_path, expected_schemata, read_temp_view, conf) + create_contacts_table_and_read(is_partitioned, format, data_path, expected_schemata, read_temp_view, conf, table_name) # https://github.com/NVIDIA/spark-rapids/issues/8715 @pytest.mark.parametrize('select_and_expected_schemata', [("friend.First", "struct>>"), @@ -195,14 +196,15 @@ def do_it(spark): @pytest.mark.skipif(is_before_spark_320(), reason='https://issues.apache.org/jira/browse/SPARK-34638') @pytest.mark.parametrize('is_partitioned', [True, False]) @pytest.mark.parametrize('format', ["parquet", "orc"]) -def test_nested_column_prune_on_generator_output(format, spark_tmp_path, select_and_expected_schemata, is_partitioned): +def test_nested_column_prune_on_generator_output(format, spark_tmp_path, select_and_expected_schemata, is_partitioned, spark_tmp_table_factory): + table_name = spark_tmp_table_factory.get() query, expected_schemata = select_and_expected_schemata data_path = spark_tmp_path + "/DATA" def read_temp_view(schema): def do_it(spark): - spark.read.format(format).schema(schema).load(data_path + "/contacts").createOrReplaceTempView("contacts") + spark.read.format(format).schema(schema).load(data_path + f"/{table_name}").createOrReplaceTempView(table_name) return spark.table("contacts").select(f.explode(f.col("friends")).alias("friend")).select(query) return do_it conf = {"spark.sql.caseSensitive": "false", "spark.sql.parquet.enableVectorizedReader": "true"} - create_contacts_table_and_read(is_partitioned, format, data_path, expected_schemata, read_temp_view, conf) \ No newline at end of file + create_contacts_table_and_read(is_partitioned, format, data_path, expected_schemata, read_temp_view, conf, table_name) \ No newline at end of file From 887272e8af65f022f208c35855198f7b86ac4dc4 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Mon, 7 Aug 2023 10:48:00 -0700 Subject: [PATCH 9/9] use the new tmp table name --- .../src/main/python/prune_partition_column_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/prune_partition_column_test.py b/integration_tests/src/main/python/prune_partition_column_test.py index 520e5a1f11e..6948cc72519 100644 --- a/integration_tests/src/main/python/prune_partition_column_test.py +++ b/integration_tests/src/main/python/prune_partition_column_test.py @@ -203,7 +203,7 @@ def test_nested_column_prune_on_generator_output(format, spark_tmp_path, select_ def read_temp_view(schema): def do_it(spark): spark.read.format(format).schema(schema).load(data_path + f"/{table_name}").createOrReplaceTempView(table_name) - return spark.table("contacts").select(f.explode(f.col("friends")).alias("friend")).select(query) + return spark.table(table_name).select(f.explode(f.col("friends")).alias("friend")).select(query) return do_it conf = {"spark.sql.caseSensitive": "false", "spark.sql.parquet.enableVectorizedReader": "true"}