From 8216b6bb52082883fc9b212cd9ab21227f2b8491 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 2 Jan 2018 23:28:19 +0800 Subject: [PATCH 01/13] wip --- python/pyspark/sql/catalog.py | 22 ++++++++++++++++++---- python/pyspark/sql/udf.py | 17 +++++++++++++++++ 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 659bc65701a0c..637bd6210405b 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -255,11 +255,25 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = spark.udf.register("stringLengthInt", len, IntegerType()) >>> spark.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] + + >>> import random + >>> from pyspark.sql.functions import udf + >>> from pyspark.sql.types import IntegerType, StringType + >>> random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic() + >>> spark.catalog.registerFunction("random_udf", random_udf, StringType()) + >>> spark.sql("SELECT random_udf()").collect() # doctest: +SKIP + [Row(random_udf()=u'82')] """ - udf = UserDefinedFunction(f, returnType=returnType, name=name, - evalType=PythonEvalType.SQL_BATCHED_UDF) - self._jsparkSession.udf().registerPython(name, udf._judf) - return udf._wrapped() + + if hasattr(f, 'asNondeterministic'): + udf = f._set_name(name, returnType) + self._jsparkSession.udf().registerPython(name, udf._judf) + return udf._wrapped() + else: + udf = UserDefinedFunction(f, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) + self._jsparkSession.udf().registerPython(name, udf._judf) + return udf._wrapped() @since(2.0) def isCached(self, tableName): diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 54b5a8656e1c8..cfb6d51b55d18 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -138,6 +138,19 @@ def __call__(self, *cols): sc = SparkContext._active_spark_context return Column(judf.apply(_to_seq(sc, cols, _to_java_column))) + def _set_name(self, name, returnType=StringType()): + """ + Updates the name of UserDefinedFunction. + """ + # reset _judf + self._judf_placeholder = None + self._returnType_placeholder = None + self._name = name or ( + func.__name__ if hasattr(func, '__name__') + else func.__class__.__name__) + self._returnType = returnType + return self + def _wrapped(self): """ Wrap this udf with a function and attach docstring from func @@ -163,6 +176,10 @@ def wrapper(*args): wrapper.returnType = self.returnType wrapper.evalType = self.evalType wrapper.asNondeterministic = self.asNondeterministic + wrapper._judf = self._judf + wrapper._create_judf = self._create_judf + wrapper._wrapped = self._wrapped + wrapper._set_name = self._set_name return wrapper From 0ecdf63e8d8178528c46254859c4c0c10b80ba08 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 2 Jan 2018 23:44:40 +0800 Subject: [PATCH 02/13] clean --- python/pyspark/sql/catalog.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 637bd6210405b..91d94d3fd8921 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -267,13 +267,11 @@ def registerFunction(self, name, f, returnType=StringType()): if hasattr(f, 'asNondeterministic'): udf = f._set_name(name, returnType) - self._jsparkSession.udf().registerPython(name, udf._judf) - return udf._wrapped() else: udf = UserDefinedFunction(f, returnType=returnType, name=name, evalType=PythonEvalType.SQL_BATCHED_UDF) - self._jsparkSession.udf().registerPython(name, udf._judf) - return udf._wrapped() + self._jsparkSession.udf().registerPython(name, udf._judf) + return udf._wrapped() @since(2.0) def isCached(self, tableName): From e8d0a4c7c8c9e81fd420195d3cc1a37a3b8459a3 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 2 Jan 2018 23:45:30 +0800 Subject: [PATCH 03/13] rename --- python/pyspark/sql/catalog.py | 2 +- python/pyspark/sql/udf.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 91d94d3fd8921..b04c6c31a6534 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -266,7 +266,7 @@ def registerFunction(self, name, f, returnType=StringType()): """ if hasattr(f, 'asNondeterministic'): - udf = f._set_name(name, returnType) + udf = f._set_name_type(name, returnType) else: udf = UserDefinedFunction(f, returnType=returnType, name=name, evalType=PythonEvalType.SQL_BATCHED_UDF) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index cfb6d51b55d18..53ba2e267ab10 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -138,7 +138,7 @@ def __call__(self, *cols): sc = SparkContext._active_spark_context return Column(judf.apply(_to_seq(sc, cols, _to_java_column))) - def _set_name(self, name, returnType=StringType()): + def _set_name_type(self, name, returnType=StringType()): """ Updates the name of UserDefinedFunction. """ @@ -179,7 +179,7 @@ def wrapper(*args): wrapper._judf = self._judf wrapper._create_judf = self._create_judf wrapper._wrapped = self._wrapped - wrapper._set_name = self._set_name + wrapper._set_name_type = self._set_name_type return wrapper From 35e6a4a5ba2750c4bd4c4bcb3d91f16e6ba1fdea Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 3 Jan 2018 08:26:51 +0800 Subject: [PATCH 04/13] skip --- python/pyspark/sql/catalog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index b04c6c31a6534..73490610fb3c0 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -260,7 +260,7 @@ def registerFunction(self, name, f, returnType=StringType()): >>> from pyspark.sql.functions import udf >>> from pyspark.sql.types import IntegerType, StringType >>> random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic() - >>> spark.catalog.registerFunction("random_udf", random_udf, StringType()) + >>> spark.catalog.registerFunction("random_udf", random_udf, StringType()) # doctest: +SKIP >>> spark.sql("SELECT random_udf()").collect() # doctest: +SKIP [Row(random_udf()=u'82')] """ From b89b720eebdbe43bc7f8630c61261e7b9d6f90a7 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 3 Jan 2018 09:04:23 +0800 Subject: [PATCH 05/13] fix --- python/pyspark/sql/catalog.py | 8 ++++++-- python/pyspark/sql/udf.py | 17 ----------------- 2 files changed, 6 insertions(+), 19 deletions(-) diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 73490610fb3c0..4e3d18cb84e12 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -260,13 +260,17 @@ def registerFunction(self, name, f, returnType=StringType()): >>> from pyspark.sql.functions import udf >>> from pyspark.sql.types import IntegerType, StringType >>> random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic() - >>> spark.catalog.registerFunction("random_udf", random_udf, StringType()) # doctest: +SKIP + >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType()) # doctest: +SKIP >>> spark.sql("SELECT random_udf()").collect() # doctest: +SKIP [Row(random_udf()=u'82')] + >>> spark.range(1).select(newRandom_udf()).collect() + [Row(random_udf()=u'62')] """ if hasattr(f, 'asNondeterministic'): - udf = f._set_name_type(name, returnType) + udf = UserDefinedFunction(f.func, returnType=returnType, name=name, + evalType=PythonEvalType.SQL_BATCHED_UDF) + udf = udf if (f._deterministic) else udf.asNondeterministic() else: udf = UserDefinedFunction(f, returnType=returnType, name=name, evalType=PythonEvalType.SQL_BATCHED_UDF) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 53ba2e267ab10..54b5a8656e1c8 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -138,19 +138,6 @@ def __call__(self, *cols): sc = SparkContext._active_spark_context return Column(judf.apply(_to_seq(sc, cols, _to_java_column))) - def _set_name_type(self, name, returnType=StringType()): - """ - Updates the name of UserDefinedFunction. - """ - # reset _judf - self._judf_placeholder = None - self._returnType_placeholder = None - self._name = name or ( - func.__name__ if hasattr(func, '__name__') - else func.__class__.__name__) - self._returnType = returnType - return self - def _wrapped(self): """ Wrap this udf with a function and attach docstring from func @@ -176,10 +163,6 @@ def wrapper(*args): wrapper.returnType = self.returnType wrapper.evalType = self.evalType wrapper.asNondeterministic = self.asNondeterministic - wrapper._judf = self._judf - wrapper._create_judf = self._create_judf - wrapper._wrapped = self._wrapped - wrapper._set_name_type = self._set_name_type return wrapper From 320813638b710d26dcebfc004271397d7e76c43f Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 3 Jan 2018 09:06:12 +0800 Subject: [PATCH 06/13] skip --- python/pyspark/sql/catalog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 4e3d18cb84e12..9d0d647b282fc 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -263,7 +263,7 @@ def registerFunction(self, name, f, returnType=StringType()): >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType()) # doctest: +SKIP >>> spark.sql("SELECT random_udf()").collect() # doctest: +SKIP [Row(random_udf()=u'82')] - >>> spark.range(1).select(newRandom_udf()).collect() + >>> spark.range(1).select(newRandom_udf()).collect() # doctest: +SKIP [Row(random_udf()=u'62')] """ From f0992610854b95e4f1b9964bdf5c62132fd52c93 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 3 Jan 2018 09:39:01 +0800 Subject: [PATCH 07/13] fix --- python/pyspark/sql/catalog.py | 5 +++-- python/pyspark/sql/context.py | 11 +++++++++++ python/pyspark/sql/tests.py | 19 ++++++++++++++----- python/pyspark/sql/udf.py | 1 + 4 files changed, 29 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 9d0d647b282fc..3e24bc8da5e12 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -259,8 +259,9 @@ def registerFunction(self, name, f, returnType=StringType()): >>> import random >>> from pyspark.sql.functions import udf >>> from pyspark.sql.types import IntegerType, StringType - >>> random_udf = udf(lambda: int(random.random() * 100), IntegerType()).asNondeterministic() - >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType()) # doctest: +SKIP + >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() + >>> newRandom_udf = spark.catalog.registerFunction( + ... "random_udf", random_udf, StringType()) # doctest: +SKIP >>> spark.sql("SELECT random_udf()").collect() # doctest: +SKIP [Row(random_udf()=u'82')] >>> spark.range(1).select(newRandom_udf()).collect() # doctest: +SKIP diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index b1e723cdecef3..214c3601a80e6 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -203,6 +203,17 @@ def registerFunction(self, name, f, returnType=StringType()): >>> _ = sqlContext.udf.register("stringLengthInt", lambda x: len(x), IntegerType()) >>> sqlContext.sql("SELECT stringLengthInt('test')").collect() [Row(stringLengthInt(test)=4)] + + >>> import random + >>> from pyspark.sql.functions import udf + >>> from pyspark.sql.types import IntegerType, StringType + >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() + >>> newRandom_udf = sqlContext.registerFunction( + ... "random_udf", random_udf, StringType()) # doctest: +SKIP + >>> sqlContext.sql("SELECT random_udf()").collect() # doctest: +SKIP + [Row(random_udf()=u'82')] + >>> sqlContext.range(1).select(newRandom_udf()).collect() # doctest: +SKIP + [Row(random_udf()=u'62')] """ return self.sparkSession.catalog.registerFunction(name, f, returnType) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 1c34c897eecb5..130f7622d9606 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -378,6 +378,20 @@ def test_udf2(self): [res] = self.spark.sql("SELECT strlen(a) FROM test WHERE strlen(a) > 1").collect() self.assertEqual(4, res[0]) + def test_non_deterministic_udf(self): + import random + from pyspark.sql.functions import udf + random_udf = udf(lambda: random.randint(6, 6), IntegerType()).asNondeterministic() + self.assertEqual(random_udf._deterministic, False) + random_udf1 = self.spark.catalog.registerFunction("randInt", random_udf, StringType()) + self.assertEqual(random_udf1._deterministic, False) + [row] = self.spark.sql("SELECT randInt()").collect() + self.assertEqual(row[0], "6") + [row] = self.spark.range(1).select(random_udf1()).collect() + self.assertEqual(row[0], "6") + [row] = self.spark.range(1).select(random_udf()).collect() + self.assertEqual(row[0], 6) + def test_chained_udf(self): self.spark.catalog.registerFunction("double", lambda x: x + x, IntegerType()) [row] = self.spark.sql("SELECT double(1)").collect() @@ -567,7 +581,6 @@ def test_read_multiple_orc_file(self): def test_udf_with_input_file_name(self): from pyspark.sql.functions import udf, input_file_name - from pyspark.sql.types import StringType sourceFile = udf(lambda path: path, StringType()) filePath = "python/test_support/sql/people1.json" row = self.spark.read.json(filePath).select(sourceFile(input_file_name())).first() @@ -575,7 +588,6 @@ def test_udf_with_input_file_name(self): def test_udf_with_input_file_name_for_hadooprdd(self): from pyspark.sql.functions import udf, input_file_name - from pyspark.sql.types import StringType def filename(path): return path @@ -635,7 +647,6 @@ def test_udf_with_string_return_type(self): def test_udf_shouldnt_accept_noncallable_object(self): from pyspark.sql.functions import UserDefinedFunction - from pyspark.sql.types import StringType non_callable = None self.assertRaises(TypeError, UserDefinedFunction, non_callable, StringType()) @@ -1299,7 +1310,6 @@ def test_between_function(self): df.filter(df.a.between(df.b, df.c)).collect()) def test_struct_type(self): - from pyspark.sql.types import StructType, StringType, StructField struct1 = StructType().add("f1", StringType(), True).add("f2", StringType(), True, None) struct2 = StructType([StructField("f1", StringType(), True), StructField("f2", StringType(), True, None)]) @@ -1368,7 +1378,6 @@ def test_parse_datatype_string(self): _parse_datatype_string("a INT, c DOUBLE")) def test_metadata_null(self): - from pyspark.sql.types import StructType, StringType, StructField schema = StructType([StructField("f1", StringType(), True, None), StructField("f2", StringType(), True, {'a': None})]) rdd = self.sc.parallelize([["a", "b"], ["c", "d"]]) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 54b5a8656e1c8..0936f0ed2a22e 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -163,6 +163,7 @@ def wrapper(*args): wrapper.returnType = self.returnType wrapper.evalType = self.evalType wrapper.asNondeterministic = self.asNondeterministic + wrapper._deterministic = self._deterministic return wrapper From d1ba7038516b12f8e50038b35e455b23f5b36876 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 3 Jan 2018 12:14:10 +0800 Subject: [PATCH 08/13] try --- python/pyspark/sql/udf.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 0936f0ed2a22e..0356642db2b74 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -68,7 +68,8 @@ class UserDefinedFunction(object): """ def __init__(self, func, returnType=StringType(), name=None, - evalType=PythonEvalType.SQL_BATCHED_UDF): + evalType=PythonEvalType.SQL_BATCHED_UDF, + deterministic=True): if not callable(func): raise TypeError( "Invalid function: not a function or callable (__call__ is not defined): " @@ -92,7 +93,7 @@ def __init__(self, func, func.__name__ if hasattr(func, '__name__') else func.__class__.__name__) self.evalType = evalType - self._deterministic = True + self.deterministic = deterministic @property def returnType(self): @@ -130,7 +131,7 @@ def _create_judf(self): wrapped_func = _wrap_function(sc, self.func, self.returnType) jdt = spark._jsparkSession.parseDataType(self.returnType.json()) judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction( - self._name, wrapped_func, jdt, self.evalType, self._deterministic) + self._name, wrapped_func, jdt, self.evalType, self.deterministic) return judf def __call__(self, *cols): @@ -162,8 +163,8 @@ def wrapper(*args): wrapper.func = self.func wrapper.returnType = self.returnType wrapper.evalType = self.evalType + wrapper.deterministic = self.deterministic wrapper.asNondeterministic = self.asNondeterministic - wrapper._deterministic = self._deterministic return wrapper @@ -173,5 +174,5 @@ def asNondeterministic(self): .. versionadded:: 2.3 """ - self._deterministic = False - return self + self.deterministic = False + return self._wrapped() From 6ac25e67bc345b35525b99d2e8659bb9554a0422 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 3 Jan 2018 12:14:31 +0800 Subject: [PATCH 09/13] try --- python/pyspark/sql/catalog.py | 4 ++-- python/pyspark/sql/tests.py | 7 +++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 3e24bc8da5e12..04dfb0431f777 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -270,8 +270,8 @@ def registerFunction(self, name, f, returnType=StringType()): if hasattr(f, 'asNondeterministic'): udf = UserDefinedFunction(f.func, returnType=returnType, name=name, - evalType=PythonEvalType.SQL_BATCHED_UDF) - udf = udf if (f._deterministic) else udf.asNondeterministic() + evalType=PythonEvalType.SQL_BATCHED_UDF, + deterministic=f.deterministic) else: udf = UserDefinedFunction(f, returnType=returnType, name=name, evalType=PythonEvalType.SQL_BATCHED_UDF) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 130f7622d9606..2a4ff3a83ba9e 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -382,15 +382,18 @@ def test_non_deterministic_udf(self): import random from pyspark.sql.functions import udf random_udf = udf(lambda: random.randint(6, 6), IntegerType()).asNondeterministic() - self.assertEqual(random_udf._deterministic, False) + self.assertEqual(random_udf.deterministic, False) random_udf1 = self.spark.catalog.registerFunction("randInt", random_udf, StringType()) - self.assertEqual(random_udf1._deterministic, False) + self.assertEqual(random_udf1.deterministic, False) [row] = self.spark.sql("SELECT randInt()").collect() self.assertEqual(row[0], "6") [row] = self.spark.range(1).select(random_udf1()).collect() self.assertEqual(row[0], "6") [row] = self.spark.range(1).select(random_udf()).collect() self.assertEqual(row[0], 6) + pydoc.render_doc(lambda: random.randint(6, 6), IntegerType()) + pydoc.render_doc(random_udf) + pydoc.render_doc(random_udf1) def test_chained_udf(self): self.spark.catalog.registerFunction("double", lambda x: x + x, IntegerType()) From 85f11bfbfb564acb670097ff4ce520bfbc79b855 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 3 Jan 2018 16:42:41 +0800 Subject: [PATCH 10/13] fix --- python/pyspark/sql/tests.py | 2 +- python/pyspark/sql/udf.py | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 2a4ff3a83ba9e..712a8ef04c955 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -391,7 +391,7 @@ def test_non_deterministic_udf(self): self.assertEqual(row[0], "6") [row] = self.spark.range(1).select(random_udf()).collect() self.assertEqual(row[0], 6) - pydoc.render_doc(lambda: random.randint(6, 6), IntegerType()) + pydoc.render_doc(udf(lambda: random.randint(6, 6), IntegerType())) pydoc.render_doc(random_udf) pydoc.render_doc(random_udf1) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 0356642db2b74..5df51e9f3fb3e 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -56,7 +56,8 @@ def _create_udf(f, returnType, evalType): ) # Set the name of the UserDefinedFunction object to be the name of function f - udf_obj = UserDefinedFunction(f, returnType=returnType, name=None, evalType=evalType) + udf_obj = UserDefinedFunction( + f, returnType=returnType, name=None, evalType=evalType, deterministic=True) return udf_obj._wrapped() @@ -67,7 +68,8 @@ class UserDefinedFunction(object): .. versionadded:: 1.3 """ def __init__(self, func, - returnType=StringType(), name=None, + returnType=StringType(), + name=None, evalType=PythonEvalType.SQL_BATCHED_UDF, deterministic=True): if not callable(func): @@ -164,7 +166,7 @@ def wrapper(*args): wrapper.returnType = self.returnType wrapper.evalType = self.evalType wrapper.deterministic = self.deterministic - wrapper.asNondeterministic = self.asNondeterministic + wrapper.asNondeterministic = lambda: self.asNondeterministic()._wrapped() return wrapper @@ -175,4 +177,4 @@ def asNondeterministic(self): .. versionadded:: 2.3 """ self.deterministic = False - return self._wrapped() + return self From 78e9b2c96204412e78ea1e50c95d52ffd6239228 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 4 Jan 2018 00:54:04 +0800 Subject: [PATCH 11/13] more comment. --- python/pyspark/sql/udf.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 5df51e9f3fb3e..5e75eb6545333 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -141,6 +141,9 @@ def __call__(self, *cols): sc = SparkContext._active_spark_context return Column(judf.apply(_to_seq(sc, cols, _to_java_column))) + # This function is for improving the online help system in the interactive interpreter. + # For example, the built-in help / pydoc.help. It wraps the UDF with the docstring and + # argument annotation. (See: SPARK-19161) def _wrapped(self): """ Wrap this udf with a function and attach docstring from func From 09a1b89cd44349bcf67fd1214827608988787df6 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 4 Jan 2018 11:41:58 +0800 Subject: [PATCH 12/13] fix. --- python/pyspark/sql/catalog.py | 8 ++++---- python/pyspark/sql/context.py | 7 +++---- python/pyspark/sql/tests.py | 29 +++++++++++++++++++---------- 3 files changed, 26 insertions(+), 18 deletions(-) diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index 04dfb0431f777..cba59aecd15fc 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -227,7 +227,7 @@ def dropGlobalTempView(self, viewName): @ignore_unicode_prefix @since(2.0) def registerFunction(self, name, f, returnType=StringType()): - """Registers a python function (including lambda function) as a UDF + """Registers a Python function (including lambda function) or a wrapped/native UDF so it can be used in SQL statements. In addition to a name and the function itself, the return type can be optionally specified. @@ -235,7 +235,7 @@ def registerFunction(self, name, f, returnType=StringType()): be done. For any other return type, the produced object must match the specified type. :param name: name of the UDF - :param f: python function + :param f: a Python function, or a wrapped/native UserDefinedFunction :param returnType: a :class:`pyspark.sql.types.DataType` object :return: a wrapped :class:`UserDefinedFunction` @@ -260,14 +260,14 @@ def registerFunction(self, name, f, returnType=StringType()): >>> from pyspark.sql.functions import udf >>> from pyspark.sql.types import IntegerType, StringType >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() - >>> newRandom_udf = spark.catalog.registerFunction( - ... "random_udf", random_udf, StringType()) # doctest: +SKIP + >>> newRandom_udf = spark.catalog.registerFunction("random_udf", random_udf, StringType()) >>> spark.sql("SELECT random_udf()").collect() # doctest: +SKIP [Row(random_udf()=u'82')] >>> spark.range(1).select(newRandom_udf()).collect() # doctest: +SKIP [Row(random_udf()=u'62')] """ + # This is to check whether the input function is a wrapped/native UserDefinedFunction if hasattr(f, 'asNondeterministic'): udf = UserDefinedFunction(f.func, returnType=returnType, name=name, evalType=PythonEvalType.SQL_BATCHED_UDF, diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index 214c3601a80e6..ed1ce69c929e6 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -175,7 +175,7 @@ def range(self, start, end=None, step=1, numPartitions=None): @ignore_unicode_prefix @since(1.2) def registerFunction(self, name, f, returnType=StringType()): - """Registers a python function (including lambda function) as a UDF + """Registers a Python function (including lambda function) or a wrapped/native UDF so it can be used in SQL statements. In addition to a name and the function itself, the return type can be optionally specified. @@ -183,7 +183,7 @@ def registerFunction(self, name, f, returnType=StringType()): be done. For any other return type, the produced object must match the specified type. :param name: name of the UDF - :param f: python function + :param f: a Python function, or a wrapped/native UserDefinedFunction :param returnType: a :class:`pyspark.sql.types.DataType` object :return: a wrapped :class:`UserDefinedFunction` @@ -208,8 +208,7 @@ def registerFunction(self, name, f, returnType=StringType()): >>> from pyspark.sql.functions import udf >>> from pyspark.sql.types import IntegerType, StringType >>> random_udf = udf(lambda: random.randint(0, 100), IntegerType()).asNondeterministic() - >>> newRandom_udf = sqlContext.registerFunction( - ... "random_udf", random_udf, StringType()) # doctest: +SKIP + >>> newRandom_udf = sqlContext.registerFunction("random_udf", random_udf, StringType()) >>> sqlContext.sql("SELECT random_udf()").collect() # doctest: +SKIP [Row(random_udf()=u'82')] >>> sqlContext.range(1).select(newRandom_udf()).collect() # doctest: +SKIP diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 712a8ef04c955..7364d98a1de6b 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -378,7 +378,24 @@ def test_udf2(self): [res] = self.spark.sql("SELECT strlen(a) FROM test WHERE strlen(a) > 1").collect() self.assertEqual(4, res[0]) - def test_non_deterministic_udf(self): + def test_udf3(self): + twoargs = self.spark.catalog.registerFunction( + "twoArgs", UserDefinedFunction(lambda x, y: len(x) + y), IntegerType()) + self.assertEqual(twoargs.deterministic, True) + [row] = self.spark.sql("SELECT twoArgs('test', 1)").collect() + self.assertEqual(row[0], 5) + + def test_nondeterministic_udf(self): + from pyspark.sql.functions import udf + import random + udf_random_col = udf(lambda: int(100 * random.random()), IntegerType()).asNondeterministic() + self.assertEqual(udf_random_col.deterministic, False) + df = self.spark.createDataFrame([Row(1)]).select(udf_random_col().alias('RAND')) + udf_add_ten = udf(lambda rand: rand + 10, IntegerType()) + [row] = df.withColumn('RAND_PLUS_TEN', udf_add_ten('RAND')).collect() + self.assertEqual(row[0] + 10, row[1]) + + def test_nondeterministic_udf2(self): import random from pyspark.sql.functions import udf random_udf = udf(lambda: random.randint(6, 6), IntegerType()).asNondeterministic() @@ -391,6 +408,7 @@ def test_non_deterministic_udf(self): self.assertEqual(row[0], "6") [row] = self.spark.range(1).select(random_udf()).collect() self.assertEqual(row[0], 6) + # render_doc() reproduces the help() exception without printing output pydoc.render_doc(udf(lambda: random.randint(6, 6), IntegerType())) pydoc.render_doc(random_udf) pydoc.render_doc(random_udf1) @@ -452,15 +470,6 @@ def test_udf_with_array_type(self): self.assertEqual(list(range(3)), l1) self.assertEqual(1, l2) - def test_nondeterministic_udf(self): - from pyspark.sql.functions import udf - import random - udf_random_col = udf(lambda: int(100 * random.random()), IntegerType()).asNondeterministic() - df = self.spark.createDataFrame([Row(1)]).select(udf_random_col().alias('RAND')) - udf_add_ten = udf(lambda rand: rand + 10, IntegerType()) - [row] = df.withColumn('RAND_PLUS_TEN', udf_add_ten('RAND')).collect() - self.assertEqual(row[0] + 10, row[1]) - def test_broadcast_in_udf(self): bar = {"a": "aa", "b": "bb", "c": "abc"} foo = self.sc.broadcast(bar) From 2482e6bcdaf92a78ae6b043a859e10140a273a18 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Thu, 4 Jan 2018 12:49:41 +0800 Subject: [PATCH 13/13] fix --- python/pyspark/sql/catalog.py | 4 ++-- python/pyspark/sql/context.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index cba59aecd15fc..156603128d063 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -227,8 +227,8 @@ def dropGlobalTempView(self, viewName): @ignore_unicode_prefix @since(2.0) def registerFunction(self, name, f, returnType=StringType()): - """Registers a Python function (including lambda function) or a wrapped/native UDF - so it can be used in SQL statements. + """Registers a Python function (including lambda function) or a :class:`UserDefinedFunction` + as a UDF. The registered UDF can be used in SQL statement. In addition to a name and the function itself, the return type can be optionally specified. When the return type is not given it default to a string and conversion will automatically diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py index ed1ce69c929e6..b8d86cc098e94 100644 --- a/python/pyspark/sql/context.py +++ b/python/pyspark/sql/context.py @@ -175,8 +175,8 @@ def range(self, start, end=None, step=1, numPartitions=None): @ignore_unicode_prefix @since(1.2) def registerFunction(self, name, f, returnType=StringType()): - """Registers a Python function (including lambda function) or a wrapped/native UDF - so it can be used in SQL statements. + """Registers a Python function (including lambda function) or a :class:`UserDefinedFunction` + as a UDF. The registered UDF can be used in SQL statement. In addition to a name and the function itself, the return type can be optionally specified. When the return type is not given it default to a string and conversion will automatically