From d044e05b0ff5b368a3316ef5fc886672ab67c56d Mon Sep 17 00:00:00 2001 From: Firestarman Date: Tue, 14 May 2024 09:44:08 +0000 Subject: [PATCH] Fix a test error for DB13.3 Signed-off-by: Firestarman --- integration_tests/src/main/python/udf_test.py | 12 -------- .../python/GpuAggregateInPandasExec.scala | 29 +++++++------------ .../python/GpuFlatMapGroupsInPandasExec.scala | 7 +++-- .../shims/GpuGroupedPythonRunnerFactory.scala | 7 +++-- .../shims/GpuGroupedPythonRunnerFactory.scala | 9 +++--- .../shims/GpuGroupedPythonRunnerFactory.scala | 11 +++---- 6 files changed, 30 insertions(+), 45 deletions(-) diff --git a/integration_tests/src/main/python/udf_test.py b/integration_tests/src/main/python/udf_test.py index 56b47d80f89..7ca3e84e9ba 100644 --- a/integration_tests/src/main/python/udf_test.py +++ b/integration_tests/src/main/python/udf_test.py @@ -98,8 +98,6 @@ def nested_size(nested): # ======= Test aggregate in Pandas ======= @approximate_float @pytest.mark.parametrize('data_gen', integral_gens, ids=idfn) -@pytest.mark.xfail(condition=is_databricks_runtime() and is_spark_341(), - reason='https://github.com/NVIDIA/spark-rapids/issues/10797') def test_single_aggregate_udf(data_gen): @f.pandas_udf('double') def pandas_sum(to_process: pd.Series) -> float: @@ -113,8 +111,6 @@ def pandas_sum(to_process: pd.Series) -> float: @approximate_float @pytest.mark.parametrize('data_gen', arrow_common_gen, ids=idfn) -@pytest.mark.xfail(condition=is_databricks_runtime() and is_spark_341(), - reason='https://github.com/NVIDIA/spark-rapids/issues/10797') def test_single_aggregate_udf_more_types(data_gen): @f.pandas_udf('double') def group_size_udf(to_process: pd.Series) -> float: @@ -128,8 +124,6 @@ def group_size_udf(to_process: pd.Series) -> float: @ignore_order @pytest.mark.parametrize('data_gen', integral_gens, ids=idfn) -@pytest.mark.xfail(condition=is_databricks_runtime() and is_spark_341(), - reason='https://github.com/NVIDIA/spark-rapids/issues/10797') def test_group_aggregate_udf(data_gen): @f.pandas_udf('long') def pandas_sum(to_process: pd.Series) -> int: @@ -147,8 +141,6 @@ def pandas_sum(to_process: pd.Series) -> int: @ignore_order(local=True) @pytest.mark.parametrize('data_gen', arrow_common_gen, ids=idfn) -@pytest.mark.xfail(condition=is_databricks_runtime() and is_spark_341(), - reason='https://github.com/NVIDIA/spark-rapids/issues/10797') def test_group_aggregate_udf_more_types(data_gen): @f.pandas_udf('long') def group_size_udf(to_process: pd.Series) -> int: @@ -246,8 +238,6 @@ def pandas_size(to_process: pd.Series) -> int: @ignore_order(local=True) @pytest.mark.parametrize('zero_enabled', [False, True]) @pytest.mark.parametrize('data_gen', [LongGen()], ids=idfn) -@pytest.mark.xfail(condition=is_databricks_runtime() and is_spark_341(), - reason='https://github.com/NVIDIA/spark-rapids/issues/10797') def test_group_apply_udf_zero_conf(data_gen, zero_enabled): def pandas_add(data): data.sum = data.b + data.a @@ -281,8 +271,6 @@ def pandas_add(data): @ignore_order(local=True) @pytest.mark.parametrize('data_gen', arrow_common_gen, ids=idfn) -@pytest.mark.xfail(condition=is_databricks_runtime() and is_spark_341(), - reason='https://github.com/NVIDIA/spark-rapids/issues/10797') def test_group_apply_udf_more_types(data_gen): def group_size_udf(key, pdf): return pd.DataFrame([[len(key), len(pdf), len(pdf.columns)]]) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala index 3f3f2803f5c..bc2f30dff2f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,8 +31,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.rapids.execution.python.shims.GpuArrowPythonRunner -import org.apache.spark.sql.rapids.shims.{ArrowUtilsShim, DataTypeUtilsShim} +import org.apache.spark.sql.rapids.execution.python.shims.GpuGroupedPythonRunnerFactory +import org.apache.spark.sql.rapids.shims.DataTypeUtilsShim import org.apache.spark.sql.types.{DataType, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch @@ -109,8 +109,6 @@ case class GpuAggregateInPandasExec( val (mNumInputRows, mNumInputBatches, mNumOutputRows, mNumOutputBatches) = commonGpuMetrics() lazy val isPythonOnGpuEnabled = GpuPythonHelper.isPythonOnGpuEnabled(conf) - val sessionLocalTimeZone = conf.sessionLocalTimeZone - val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf) val childOutput = child.output val resultExprs = resultExpressions @@ -204,27 +202,22 @@ case class GpuAggregateInPandasExec( } } + val runnerFactory = GpuGroupedPythonRunnerFactory(conf, pyFuncs, argOffsets, + aggInputSchema, DataTypeUtilsShim.fromAttributes(pyOutAttributes), + PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF) + // Third, sends to Python to execute the aggregate and returns the result. if (pyInputIter.hasNext) { // Launch Python workers only when the data is not empty. - val pyRunner = new GpuArrowPythonRunner( - pyFuncs, - PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF, - argOffsets, - aggInputSchema, - sessionLocalTimeZone, - pythonRunnerConf, - // The whole group data should be written in a single call, so here is unlimited - Int.MaxValue, - DataTypeUtilsShim.fromAttributes(pyOutAttributes)) - + val pyRunner = runnerFactory.getRunner() val pyOutputIterator = pyRunner.compute(pyInputIter, context.partitionId(), context) val combinedAttrs = gpuGroupingExpressions.map(_.toAttribute) ++ pyOutAttributes val resultRefs = GpuBindReferences.bindGpuReferences(resultExprs, combinedAttrs) // Gets the combined batch for each group and projects for the output. - new CombiningIterator(batchProducer.getBatchQueue, pyOutputIterator, pyRunner, - mNumOutputRows, mNumOutputBatches).map { combinedBatch => + new CombiningIterator(batchProducer.getBatchQueue, pyOutputIterator, + pyRunner.asInstanceOf[GpuArrowOutput], mNumOutputRows, + mNumOutputBatches).map { combinedBatch => withResource(combinedBatch) { batch => GpuProjectExec.project(batch, resultRefs) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapGroupsInPandasExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapGroupsInPandasExec.scala index 6c2f716583f..4a24a449b24 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapGroupsInPandasExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuFlatMapGroupsInPandasExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,7 +21,7 @@ import com.nvidia.spark.rapids.python.PythonWorkerSemaphore import com.nvidia.spark.rapids.shims.ShimUnaryExecNode import org.apache.spark.TaskContext -import org.apache.spark.api.python.ChainedPythonFunctions +import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} @@ -123,7 +123,8 @@ case class GpuFlatMapGroupsInPandasExec( resolveArgOffsets(child, groupingAttributes) val runnerFactory = GpuGroupedPythonRunnerFactory(conf, chainedFunc, Array(argOffsets), - DataTypeUtilsShim.fromAttributes(dedupAttrs), pythonOutputSchema) + DataTypeUtilsShim.fromAttributes(dedupAttrs), pythonOutputSchema, + PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF) // Start processing. Map grouped batches to ArrowPythonRunner results. child.executeColumnar().mapPartitionsInternal { inputIter => diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala index ad3d2522475..aa2db66ff06 100644 --- a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala @@ -41,7 +41,7 @@ spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.execution.python.shims -import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.api.python.ChainedPythonFunctions import org.apache.spark.sql.rapids.shims.ArrowUtilsShim import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -51,14 +51,15 @@ case class GpuGroupedPythonRunnerFactory( chainedFunc: Seq[ChainedPythonFunctions], argOffsets: Array[Array[Int]], dedupAttrs: StructType, - pythonOutputSchema: StructType) { + pythonOutputSchema: StructType, + evalType: Int) { val sessionLocalTimeZone = conf.sessionLocalTimeZone val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf) def getRunner(): GpuBasePythonRunner[ColumnarBatch] = { new GpuArrowPythonRunner( chainedFunc, - PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + evalType, argOffsets, dedupAttrs, sessionLocalTimeZone, diff --git a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala index c97bf1abd3e..451de0a2527 100644 --- a/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala +++ b/sql-plugin/src/main/spark330db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala @@ -20,7 +20,7 @@ spark-rapids-shim-json-lines ***/ package org.apache.spark.sql.rapids.execution.python.shims -import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} +import org.apache.spark.api.python.ChainedPythonFunctions import org.apache.spark.sql.rapids.shims.ArrowUtilsShim import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -30,7 +30,8 @@ case class GpuGroupedPythonRunnerFactory( chainedFunc: Seq[ChainedPythonFunctions], argOffsets: Array[Array[Int]], dedupAttrs: StructType, - pythonOutputSchema: StructType) { + pythonOutputSchema: StructType, + evalType: Int) { // Configs from DB runtime val maxBytes = conf.pandasZeroConfConversionGroupbyApplyMaxBytesPerSlice val zeroConfEnabled = conf.pandasZeroConfConversionGroupbyApplyEnabled @@ -41,7 +42,7 @@ case class GpuGroupedPythonRunnerFactory( if (zeroConfEnabled && maxBytes > 0L) { new GpuGroupUDFArrowPythonRunner( chainedFunc, - PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + evalType, argOffsets, dedupAttrs, sessionLocalTimeZone, @@ -52,7 +53,7 @@ case class GpuGroupedPythonRunnerFactory( } else { new GpuArrowPythonRunner( chainedFunc, - PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + evalType, argOffsets, dedupAttrs, sessionLocalTimeZone, diff --git a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala index f2297248711..b1dabbf5b5e 100644 --- a/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala +++ b/sql-plugin/src/main/spark341db/scala/org/apache/spark/sql/rapids/execution/python/shims/GpuGroupedPythonRunnerFactory.scala @@ -24,24 +24,25 @@ import org.apache.spark.sql.rapids.shims.ArrowUtilsShim import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch -//TODO is this needed? we already have a similar version in spark330db case class GpuGroupedPythonRunnerFactory( conf: org.apache.spark.sql.internal.SQLConf, chainedFunc: Seq[ChainedPythonFunctions], argOffsets: Array[Array[Int]], dedupAttrs: StructType, - pythonOutputSchema: StructType) { + pythonOutputSchema: StructType, + evalType: Int) { // Configs from DB runtime val maxBytes = conf.pandasZeroConfConversionGroupbyApplyMaxBytesPerSlice val zeroConfEnabled = conf.pandasZeroConfConversionGroupbyApplyEnabled + val isArrowBatchSlicingEnabled = conf.pythonArrowBatchSlicingEnabled val sessionLocalTimeZone = conf.sessionLocalTimeZone val pythonRunnerConf = ArrowUtilsShim.getPythonRunnerConfMap(conf) def getRunner(): GpuBasePythonRunner[ColumnarBatch] = { - if (zeroConfEnabled && maxBytes > 0L) { + if (isArrowBatchSlicingEnabled || (zeroConfEnabled && maxBytes > 0L)) { new GpuGroupUDFArrowPythonRunner( chainedFunc, - PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + evalType, argOffsets, dedupAttrs, sessionLocalTimeZone, @@ -52,7 +53,7 @@ case class GpuGroupedPythonRunnerFactory( } else { new GpuArrowPythonRunner( chainedFunc, - PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + evalType, argOffsets, dedupAttrs, sessionLocalTimeZone,