Skip to content

Commit

Permalink
[SPARK-22930][PYTHON][SQL] Improve the description of Vectorized UDFs…
Browse files Browse the repository at this point in the history
… for non-deterministic cases

## What changes were proposed in this pull request?

Add tests for using non deterministic UDFs in aggregate.

Update pandas_udf docstring w.r.t to determinism.

## How was this patch tested?
test_nondeterministic_udf_in_aggregate

Author: Li Jin <[email protected]>

Closes #20142 from icexelloss/SPARK-22930-pandas-udf-deterministic.
  • Loading branch information
icexelloss authored and gatorsmile committed Jan 6, 2018
1 parent bf65cd3 commit f2dd8b9
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 1 deletion.
12 changes: 11 additions & 1 deletion python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2214,7 +2214,17 @@ def pandas_udf(f=None, returnType=None, functionType=None):
.. seealso:: :meth:`pyspark.sql.GroupedData.apply`
.. note:: The user-defined function must be deterministic.
.. note:: The user-defined functions are considered deterministic by default. Due to
optimization, duplicate invocations may be eliminated or the function may even be invoked
more times than it is present in the query. If your function is not deterministic, call
`asNondeterministic` on the user defined function. E.g.:
>>> @pandas_udf('double', PandasUDFType.SCALAR) # doctest: +SKIP
... def random(v):
... import numpy as np
... import pandas as pd
... return pd.Series(np.random.randn(len(v))
>>> random = random.asNondeterministic() # doctest: +SKIP
.. note:: The user-defined functions do not support conditional expressions or short curcuiting
in boolean expressions and it ends up with being executed all internally. If the functions
Expand Down
52 changes: 52 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ def test_udf3(self):
self.assertEqual(row[0], 5)

def test_nondeterministic_udf(self):
# Test that nondeterministic UDFs are evaluated only once in chained UDF evaluations
from pyspark.sql.functions import udf
import random
udf_random_col = udf(lambda: int(100 * random.random()), IntegerType()).asNondeterministic()
Expand Down Expand Up @@ -413,6 +414,18 @@ def test_nondeterministic_udf2(self):
pydoc.render_doc(random_udf)
pydoc.render_doc(random_udf1)

def test_nondeterministic_udf_in_aggregate(self):
from pyspark.sql.functions import udf, sum
import random
udf_random_col = udf(lambda: int(100 * random.random()), 'int').asNondeterministic()
df = self.spark.range(10)

with QuietTest(self.sc):
with self.assertRaisesRegexp(AnalysisException, "nondeterministic"):
df.groupby('id').agg(sum(udf_random_col())).collect()
with self.assertRaisesRegexp(AnalysisException, "nondeterministic"):
df.agg(sum(udf_random_col())).collect()

def test_chained_udf(self):
self.spark.catalog.registerFunction("double", lambda x: x + x, IntegerType())
[row] = self.spark.sql("SELECT double(1)").collect()
Expand Down Expand Up @@ -3567,6 +3580,18 @@ def tearDownClass(cls):
time.tzset()
ReusedSQLTestCase.tearDownClass()

@property
def random_udf(self):
from pyspark.sql.functions import pandas_udf

@pandas_udf('double')
def random_udf(v):
import pandas as pd
import numpy as np
return pd.Series(np.random.random(len(v)))
random_udf = random_udf.asNondeterministic()
return random_udf

def test_vectorized_udf_basic(self):
from pyspark.sql.functions import pandas_udf, col
df = self.spark.range(10).select(
Expand Down Expand Up @@ -3950,6 +3975,33 @@ def test_vectorized_udf_timestamps_respect_session_timezone(self):
finally:
self.spark.conf.set("spark.sql.session.timeZone", orig_tz)

def test_nondeterministic_udf(self):
# Test that nondeterministic UDFs are evaluated only once in chained UDF evaluations
from pyspark.sql.functions import udf, pandas_udf, col

@pandas_udf('double')
def plus_ten(v):
return v + 10
random_udf = self.random_udf

df = self.spark.range(10).withColumn('rand', random_udf(col('id')))
result1 = df.withColumn('plus_ten(rand)', plus_ten(df['rand'])).toPandas()

self.assertEqual(random_udf.deterministic, False)
self.assertTrue(result1['plus_ten(rand)'].equals(result1['rand'] + 10))

def test_nondeterministic_udf_in_aggregate(self):
from pyspark.sql.functions import pandas_udf, sum

df = self.spark.range(10)
random_udf = self.random_udf

with QuietTest(self.sc):
with self.assertRaisesRegexp(AnalysisException, 'nondeterministic'):
df.groupby(df.id).agg(sum(random_udf(df.id))).collect()
with self.assertRaisesRegexp(AnalysisException, 'nondeterministic'):
df.agg(sum(random_udf(df.id))).collect()


@unittest.skipIf(not _have_pandas or not _have_arrow, "Pandas or Arrow not installed")
class GroupbyApplyTests(ReusedSQLTestCase):
Expand Down

0 comments on commit f2dd8b9

Please sign in to comment.