-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22939] [PySpark] Support Spark UDF in registerFunction #20137
Conversation
Test build #85607 has finished for PR 20137 at commit
|
Test build #85608 has finished for PR 20137 at commit
|
Test build #85616 has finished for PR 20137 at commit
|
Test build #85617 has finished for PR 20137 at commit
|
Test build #85618 has finished for PR 20137 at commit
|
Hey @gatorsmile, I was just looking into this now. How about we have wrapper.returnType = self.returnType
wrapper.evalType = self.evalType
- wrapper.asNondeterministic = self.asNondeterministic
+ wrapper.asNondeterministic = lambda: self.asNondeterministic()._wrapped()
+ wrapper._unwrapped = lambda: self
return wrapper and then we do? if hasattr(f, "_unwrapped"):
f = f._unwrapped()
if isinstance(f, UserDefinedFunction):
udf = UserDefinedFunction(f.func, returnType=returnType, name=name,
evalType=PythonEvalType.SQL_BATCHED_UDF)
udf = udf if (f._deterministic) else udf.asNondeterministic()
else:
# Existing logics. Retruning from pyspark.sql.functions import udf
help(udf(lambda: 1, "integer").asNondeterministic()) I haven't tested the suggestion above but I think this is going to roughly work fine and resolve two issues too. |
@HyukjinKwon We need to fix
|
Ur, @gatorsmile, then, we will return a wrapped function from |
I am not against anything, but the outputs of the following two are inconsistent. It looks confusing to end users.
BTW, this PR is not just for |
but if we do + wrapper.asNondeterministic = lambda: self.asNondeterministic()._wrapped() I think it will still show a proper pydoc .. |
Can you run the command?
|
Let me test it and be back soon. |
Take your time. I will not be online in the next two hours. |
With this diff: diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 54b5a8656e1..24de9839e90 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -162,7 +162,8 @@ class UserDefinedFunction(object):
wrapper.func = self.func
wrapper.returnType = self.returnType
wrapper.evalType = self.evalType
- wrapper.asNondeterministic = self.asNondeterministic
+ wrapper.asNondeterministic = lambda: self.asNondeterministic()._wrapped()
+ wrapper._unwrapped = lambda: self
return wrapper Before from pyspark.sql.functions import udf
help(udf(lambda: 1, "integer").asNondeterministic())
from pyspark.sql.functions import udf
help(udf(lambda: 1, "integer"))
After from pyspark.sql.functions import udf
help(udf(lambda: 1, "integer").asNondeterministic())
from pyspark.sql.functions import udf
help(udf(lambda: 1, "integer"))
|
With this diff: --- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -173,4 +173,4 @@ class UserDefinedFunction(object):
.. versionadded:: 2.3
"""
self._deterministic = False
- return self
+ return self._wrapped() After from pyspark.sql.functions import udf
help(udf(lambda: 1, "integer").asNondeterministic())
from pyspark.sql.functions import udf
help(udf(lambda: 1, "integer"))
|
Test build #85619 has finished for PR 20137 at commit
|
Yup, the fix for deterministic UDFs seem fine but the change about If you meant the docstring about I think we can do the things below: wrapper.asNondeterministic = functools.wraps(
self.asNondeterministic)(lambda: self.asNondeterministic()._wrapped()) |
Thank you for bearing with me @gatorsmile. |
Test build #85625 has finished for PR 20137 at commit
|
@HyukjinKwon Thank you for your comment! |
python/pyspark/sql/catalog.py
Outdated
>>> from pyspark.sql.types import IntegerType, StringType | ||
>>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() | ||
>>> newRandom_udf = spark.catalog.registerFunction( | ||
... "random_udf", random_udf, StringType()) # doctest: +SKIP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why skip the test? we can use a fixed seed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The output contains a hex value.
python/pyspark/sql/tests.py
Outdated
self.assertEqual(row[0], "6") | ||
[row] = self.spark.range(1).select(random_udf()).collect() | ||
self.assertEqual(row[0], 6) | ||
pydoc.render_doc(udf(lambda: random.randint(6, 6), IntegerType())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does it do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to test a help function. See https://github.com/gatorsmile/spark/blob/85f11bfbfb564acb670097ff4ce520bfbc79b855/python/pyspark/sql/tests.py#L1681-L1688
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we put this tests there or make this separate from test_non_deterministic_udf
? Adding comments is also fine to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will add a comment.
Test build #85636 has finished for PR 20137 at commit
|
python/pyspark/sql/catalog.py
Outdated
>>> from pyspark.sql.types import IntegerType, StringType | ||
>>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() | ||
>>> newRandom_udf = spark.catalog.registerFunction( | ||
... "random_udf", random_udf, StringType()) # doctest: +SKIP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I think we can remove # doctest: +SKIP
for this line because this line simply assigns a value to newRandom_udf
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newRandom_udf
is also used.
return judf | ||
|
||
def __call__(self, *cols): | ||
judf = self._judf | ||
sc = SparkContext._active_spark_context | ||
return Column(judf.apply(_to_seq(sc, cols, _to_java_column))) | ||
|
||
# This function is for improving the online help system in the interactive interpreter. | ||
# For example, the built-in help / pydoc.help. It wraps the UDF with the docstring and | ||
# argument annotation. (See: SPARK-19161) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can put this in the docstring of _wrapped
between L148 and 150L.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not want to expose these comments to the doc.
python/pyspark/sql/catalog.py
Outdated
udf = UserDefinedFunction(f, returnType=returnType, name=name, | ||
evalType=PythonEvalType.SQL_BATCHED_UDF) | ||
|
||
if hasattr(f, 'asNondeterministic'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, this one made me to suggest wrapper._unwrapped = lambda: self
way.
So, here this can be wrapped function or UserDefinedFunction
and I thought it's not quite clear what we expect here by hasattr(f, 'asNondeterministic')
.
Could we at least leave come comments saying that this can be both wrapped function for UserDefinedFunction
and UserDefinedFunction
itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will add a comment.
@@ -255,9 +255,26 @@ def registerFunction(self, name, f, returnType=StringType()): | |||
>>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) | |||
>>> spark.sql("SELECT stringLengthInt('test')").collect() | |||
[Row(stringLengthInt(test)=4)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's fix the doc for this too. It says :param f: python function
but we could describe that it takes Python native function, wrapped function and UserDefinedFunction
too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -162,7 +168,8 @@ def wrapper(*args): | |||
wrapper.func = self.func | |||
wrapper.returnType = self.returnType | |||
wrapper.evalType = self.evalType | |||
wrapper.asNondeterministic = self.asNondeterministic | |||
wrapper.deterministic = self.deterministic | |||
wrapper.asNondeterministic = lambda: self.asNondeterministic()._wrapped() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do:
wrapper.asNondeterministic = functools.wraps(
self.asNondeterministic)(lambda: self.asNondeterministic()._wrapped())
So that it can produce a proper pydoc when we do help(udf(lambda: 1, "integer").asNondeterministic)
(not help(udf(lambda: 1, "integer").asNondeterministic())
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good to know the difference
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will leave this unchanged. Maybe you can submit a follow-up PR to address it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely. Will give a try within the following week tho ...
@@ -172,5 +179,5 @@ def asNondeterministic(self): | |||
|
|||
.. versionadded:: 2.3 | |||
""" | |||
self._deterministic = False | |||
self.deterministic = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we call it udfDeterministic
to be consistent with Scala side?
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala
Line 33 in ff48b1b
udfDeterministic: Boolean) { |
The opposite works fine to me too if that's possible in any way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deterministic
is used in UserDefinedFunction.scala
. Users can use it to check whether this UDF is deterministic or not.
cc @mgaido91 since you touched related codes lately. |
Looks fine to me otherwise BTW. |
python/pyspark/sql/catalog.py
Outdated
@@ -227,15 +227,15 @@ def dropGlobalTempView(self, viewName): | |||
@ignore_unicode_prefix | |||
@since(2.0) | |||
def registerFunction(self, name, f, returnType=StringType()): | |||
"""Registers a python function (including lambda function) as a UDF | |||
"""Registers a Python function (including lambda function) or a wrapped/native UDF |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm really confusing when reading this document, it would be much more clear to me if we can just say
Registers a Python function (including lambda function) or a :class:`UserDefinedFunction` as a UDF
This wrapping logic was added in #16534 , is it really worth?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It indeed added some complexity. However, I believe nothing is blocked by #16534 now if I understand correctly.
The changes #16534 is quite nice because IMHO Python guys probably use help()
and dir()
more frequently then reading the API doc in the website. For the set of UDFs are provided as a library, I think that's quite worth to keep.
How about leaving this wrapper logic as is for now and then we bring this discussion back when actually something is blocked (or being too complicated) by this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another idea just in case it helps:
Registers a Python function as a UDF or a user defined function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, to be honest, I remember I gave several quick tries to get rid of the wrapper but keep the docstring correctly at that time but I failed to make a good alternative.
Might be good to try if there is a clever way to get rid of the wrapper but keep the doc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
Test build #85655 has finished for PR 20137 at commit
|
LGTM except #20137 (comment) but I will make a followup soon. Fixing it here works fine to me too. |
Test build #85657 has finished for PR 20137 at commit
|
# This is to check whether the input function is a wrapped/native UserDefinedFunction | ||
if hasattr(f, 'asNondeterministic'): | ||
udf = UserDefinedFunction(f.func, returnType=returnType, name=name, | ||
evalType=PythonEvalType.SQL_BATCHED_UDF, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @ueshin @icexelloss , shall we support register pandas UDF here too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems we can support it by just changing evalType=PythonEvalType.SQL_BATCHED_UDF
to evalType=f.evalType
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 but I think there's no way to use a group map UDF in SQL syntax if I understood correctly. I think we can safely fail fast for now as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will support the pandas UDF as a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 too
Thanks! Merged to master and 2.3 |
## What changes were proposed in this pull request? ```Python import random from pyspark.sql.functions import udf from pyspark.sql.types import IntegerType, StringType random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic() spark.catalog.registerFunction("random_udf", random_udf, StringType()) spark.sql("SELECT random_udf()").collect() ``` We will get the following error. ``` Py4JError: An error occurred while calling o29.__getnewargs__. Trace: py4j.Py4JException: Method __getnewargs__([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:274) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:745) ``` This PR is to support it. ## How was this patch tested? WIP Author: gatorsmile <[email protected]> Closes #20137 from gatorsmile/registerFunction. (cherry picked from commit 5aadbc9) Signed-off-by: gatorsmile <[email protected]>
What changes were proposed in this pull request?
We will get the following error.
This PR is to support it.
How was this patch tested?
WIP