From c42540537165f023348a768118865c1592d56fd7 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Tue, 25 Jun 2024 12:26:00 +0900 Subject: [PATCH 1/4] Python UDF in higher order functions should not throw internal error --- .../main/resources/error/error-conditions.json | 5 +++++ .../sql/catalyst/analysis/CheckAnalysis.scala | 8 ++++++++ .../sql/execution/python/PythonUDFSuite.scala | 16 ++++++++++++++-- 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 975536c076ddb..9ef5c120b2165 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -2425,6 +2425,11 @@ "message" : [ "A higher order function expects arguments, but got ." ] + }, + "PYTHON_UDF" : { + "message" : [ + "Python UDFs should be used in a lambda function at a higher order function. However, was a Python UDF." + ] } }, "sqlState" : "42K0D" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index bd8f8fe9f6528..2d87cbbfecb4f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -254,6 +254,14 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB hof.invalidFormat(checkRes) } + case hof: HigherOrderFunction + if hof.resolved && hof.functions + .exists(_.exists(_.isInstanceOf[PythonUDF])) => + val u = hof.functions.flatMap(_.find(_.isInstanceOf[PythonUDF])).head + hof.failAnalysis( + errorClass = "INVALID_LAMBDA_FUNCTION_CALL.PYTHON_UDF", + messageParameters = Map("funcName" -> toSQLExpr(u))) + // If an attribute can't be resolved as a map key of string type, either the key should be // surrounded with single quotes, or there is a typo in the attribute name. case GetMapValue(map, key: Attribute) if isMapWithStringKey(map) && !key.resolved => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala index 3101281251b1b..30ab2be308e2f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.execution.python -import org.apache.spark.sql.{IntegratedUDFTestUtils, QueryTest} -import org.apache.spark.sql.functions.count +import org.apache.spark.sql.{AnalysisException, IntegratedUDFTestUtils, QueryTest} +import org.apache.spark.sql.functions.{array, count, transform} import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.LongType @@ -112,4 +112,16 @@ class PythonUDFSuite extends QueryTest with SharedSparkSession { val pandasTestUDF = TestGroupedAggPandasUDF(name = udfName) assert(df.agg(pandasTestUDF(df("id"))).schema.fieldNames.exists(_.startsWith(udfName))) } + + test("SPARK-48706: Negative test case for Python UDF in higher order functions") { + assume(shouldTestPythonUDFs) + checkError( + exception = intercept[AnalysisException] { + spark.range(1).select(transform(array("id"), x => pythonTestUDF(x))).collect() + }, + errorClass = "INVALID_LAMBDA_FUNCTION_CALL.PYTHON_UDF", + parameters = Map("funcName" -> "\"pyUDF(namedlambdavariable())\""), + context = ExpectedContext( + "transform", s".*${this.getClass.getSimpleName}.*")) + } } From c53aa7e2043dbd1606a2803c00f67ca8c14263ce Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Tue, 25 Jun 2024 15:55:59 +0900 Subject: [PATCH 2/4] Address a comment --- .../src/main/resources/error/error-conditions.json | 10 +++++----- .../spark/sql/catalyst/analysis/CheckAnalysis.scala | 2 +- .../spark/sql/execution/python/PythonUDFSuite.scala | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 9ef5c120b2165..40ea48fdb5cd6 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -2425,11 +2425,6 @@ "message" : [ "A higher order function expects arguments, but got ." ] - }, - "PYTHON_UDF" : { - "message" : [ - "Python UDFs should be used in a lambda function at a higher order function. However, was a Python UDF." - ] } }, "sqlState" : "42K0D" @@ -4650,6 +4645,11 @@ "message" : [ "TRANSFORM with SERDE is only supported in hive mode." ] + }, + "LAMBDA_FUNCTION_WITH_PYTHON_UDF" : { + "message" : [ + "Python UDFs with a lambda function at a higher order function is unsupported. However, a Python UDF was found." + ] } }, "sqlState" : "0A000" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 2d87cbbfecb4f..9f3eee5198a16 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -259,7 +259,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB .exists(_.exists(_.isInstanceOf[PythonUDF])) => val u = hof.functions.flatMap(_.find(_.isInstanceOf[PythonUDF])).head hof.failAnalysis( - errorClass = "INVALID_LAMBDA_FUNCTION_CALL.PYTHON_UDF", + errorClass = "UNSUPPORTED_FEATURE.LAMBDA_FUNCTION_WITH_PYTHON_UDF", messageParameters = Map("funcName" -> toSQLExpr(u))) // If an attribute can't be resolved as a map key of string type, either the key should be diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala index 30ab2be308e2f..2e56ad0ab4160 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/python/PythonUDFSuite.scala @@ -119,7 +119,7 @@ class PythonUDFSuite extends QueryTest with SharedSparkSession { exception = intercept[AnalysisException] { spark.range(1).select(transform(array("id"), x => pythonTestUDF(x))).collect() }, - errorClass = "INVALID_LAMBDA_FUNCTION_CALL.PYTHON_UDF", + errorClass = "UNSUPPORTED_FEATURE.LAMBDA_FUNCTION_WITH_PYTHON_UDF", parameters = Map("funcName" -> "\"pyUDF(namedlambdavariable())\""), context = ExpectedContext( "transform", s".*${this.getClass.getSimpleName}.*")) From 3b60e26439eb6559fd874ca06d0d4c9bf4676671 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Tue, 25 Jun 2024 17:42:03 +0900 Subject: [PATCH 3/4] fixup --- common/utils/src/main/resources/error/error-conditions.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 40ea48fdb5cd6..64e8625e30b5e 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -4648,7 +4648,7 @@ }, "LAMBDA_FUNCTION_WITH_PYTHON_UDF" : { "message" : [ - "Python UDFs with a lambda function at a higher order function is unsupported. However, a Python UDF was found." + "Lambda function with Python UDF in a higher order function." ] } }, From e9063c80209b68d81e3c1b6b0ef95a532cbdd630 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Wed, 26 Jun 2024 08:42:20 +0900 Subject: [PATCH 4/4] fixup --- .../src/main/resources/error/error-conditions.json | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 64e8625e30b5e..4d291784899bd 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -4476,6 +4476,11 @@ "INSERT INTO with IF NOT EXISTS in the PARTITION spec." ] }, + "LAMBDA_FUNCTION_WITH_PYTHON_UDF" : { + "message" : [ + "Lambda function with Python UDF in a higher order function." + ] + }, "LATERAL_COLUMN_ALIAS_IN_AGGREGATE_FUNC" : { "message" : [ "Referencing a lateral column alias in the aggregate function ." @@ -4645,11 +4650,6 @@ "message" : [ "TRANSFORM with SERDE is only supported in hive mode." ] - }, - "LAMBDA_FUNCTION_WITH_PYTHON_UDF" : { - "message" : [ - "Lambda function with Python UDF in a higher order function." - ] } }, "sqlState" : "0A000"