Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13697][PySpark]Fix the missing module name of TransformFunctionSerializer.loads #11535

Closed
wants to merge 2 commits into from
Closed

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Mar 5, 2016

What changes were proposed in this pull request?

Set the function's module name to __main__ if it's missing in TransformFunctionSerializer.loads.

How was this patch tested?

Manually test in the shell.

Before this patch:

>>> from pyspark.streaming import StreamingContext
>>> from pyspark.streaming.util import TransformFunction
>>> ssc = StreamingContext(sc, 1)
>>> func = TransformFunction(sc, lambda x: x, sc.serializer)
>>> func.rdd_wrapper(lambda x: x)
TransformFunction(<function <lambda> at 0x106ac8b18>)
>>> bytes = bytearray(ssc._transformerSerializer.serializer.dumps((func.func, func.rdd_wrap_func, func.deserializers))) 
>>> func2 = ssc._transformerSerializer.loads(bytes)
>>> print(func2.func.__module__)
None
>>> print(func2.rdd_wrap_func.__module__)
None
>>> 

After this patch:

>>> from pyspark.streaming import StreamingContext
>>> from pyspark.streaming.util import TransformFunction
>>> ssc = StreamingContext(sc, 1)
>>> func = TransformFunction(sc, lambda x: x, sc.serializer)
>>> func.rdd_wrapper(lambda x: x)
TransformFunction(<function <lambda> at 0x108bf1b90>)
>>> bytes = bytearray(ssc._transformerSerializer.serializer.dumps((func.func, func.rdd_wrap_func, func.deserializers))) 
>>> func2 = ssc._transformerSerializer.loads(bytes)
>>> print(func2.func.__module__)
__main__
>>> print(func2.rdd_wrap_func.__module__)
__main__
>>> 

@zsxwing
Copy link
Member Author

zsxwing commented Mar 5, 2016

cc @davies

@@ -111,6 +111,10 @@ def loads(self, data):
self.failure = None
try:
f, wrap_func, deserializers = self.serializer.loads(bytes(data))
if f.__module__ is None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you fix this in CloudPicker?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just calls pickle.loads(obj). So we cannot get f and wrap_func there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkout save_function_tuple() and _fill_function in cloudpickle.py

@davies
Copy link
Contributor

davies commented Mar 5, 2016

diff --git a/python/pyspark/cloudpickle.py b/python/pyspark/cloudpickle.py
index 95b3abc..e56e22a 100644
--- a/python/pyspark/cloudpickle.py
+++ b/python/pyspark/cloudpickle.py
@@ -241,6 +241,7 @@ class CloudPickler(Pickler):
         save(f_globals)
         save(defaults)
         save(dct)
+        save(func.__module__)
         write(pickle.TUPLE)
         write(pickle.REDUCE)  # applies _fill_function on the tuple

@@ -698,13 +699,14 @@ def _genpartial(func, args, kwds):
     return partial(func, *args, **kwds)


-def _fill_function(func, globals, defaults, dict):
+def _fill_function(func, globals, defaults, dict, module):
     """ Fills in the rest of function data into the skeleton function object
         that were created via _make_skel_func().
          """
     func.__globals__.update(globals)
     func.__defaults__ = defaults
     func.__dict__ = dict
+    func.__module__ = module

     return func

@davies
Copy link
Contributor

davies commented Mar 5, 2016

Please add a test case in SerializationTestCase

@SparkQA
Copy link

SparkQA commented Mar 5, 2016

Test build #52510 has finished for PR 11535 at commit c4e35ff.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 5, 2016

Test build #52511 has finished for PR 11535 at commit 60c3068.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zsxwing
Copy link
Member Author

zsxwing commented Mar 6, 2016

Updated and added a unit test. Thanks, @davies

@SparkQA
Copy link

SparkQA commented Mar 6, 2016

Test build #52524 has finished for PR 11535 at commit 5f269ed.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 6, 2016

Test build #52525 has finished for PR 11535 at commit c98042b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Mar 6, 2016

LGTM, merging this into master, thanks!

asfgit pushed a commit that referenced this pull request Mar 6, 2016
…ionSerializer.loads

## What changes were proposed in this pull request?

Set the function's module name to `__main__` if it's missing in `TransformFunctionSerializer.loads`.

## How was this patch tested?

Manually test in the shell.

Before this patch:
```
>>> from pyspark.streaming import StreamingContext
>>> from pyspark.streaming.util import TransformFunction
>>> ssc = StreamingContext(sc, 1)
>>> func = TransformFunction(sc, lambda x: x, sc.serializer)
>>> func.rdd_wrapper(lambda x: x)
TransformFunction(<function <lambda> at 0x106ac8b18>)
>>> bytes = bytearray(ssc._transformerSerializer.serializer.dumps((func.func, func.rdd_wrap_func, func.deserializers)))
>>> func2 = ssc._transformerSerializer.loads(bytes)
>>> print(func2.func.__module__)
None
>>> print(func2.rdd_wrap_func.__module__)
None
>>>
```
After this patch:
```
>>> from pyspark.streaming import StreamingContext
>>> from pyspark.streaming.util import TransformFunction
>>> ssc = StreamingContext(sc, 1)
>>> func = TransformFunction(sc, lambda x: x, sc.serializer)
>>> func.rdd_wrapper(lambda x: x)
TransformFunction(<function <lambda> at 0x108bf1b90>)
>>> bytes = bytearray(ssc._transformerSerializer.serializer.dumps((func.func, func.rdd_wrap_func, func.deserializers)))
>>> func2 = ssc._transformerSerializer.loads(bytes)
>>> print(func2.func.__module__)
__main__
>>> print(func2.rdd_wrap_func.__module__)
__main__
>>>
```

Author: Shixiong Zhu <[email protected]>

Closes #11535 from zsxwing/loads-module.

(cherry picked from commit ee913e6)
Signed-off-by: Davies Liu <[email protected]>
asfgit pushed a commit that referenced this pull request Mar 6, 2016
…ionSerializer.loads

## What changes were proposed in this pull request?

Set the function's module name to `__main__` if it's missing in `TransformFunctionSerializer.loads`.

## How was this patch tested?

Manually test in the shell.

Before this patch:
```
>>> from pyspark.streaming import StreamingContext
>>> from pyspark.streaming.util import TransformFunction
>>> ssc = StreamingContext(sc, 1)
>>> func = TransformFunction(sc, lambda x: x, sc.serializer)
>>> func.rdd_wrapper(lambda x: x)
TransformFunction(<function <lambda> at 0x106ac8b18>)
>>> bytes = bytearray(ssc._transformerSerializer.serializer.dumps((func.func, func.rdd_wrap_func, func.deserializers)))
>>> func2 = ssc._transformerSerializer.loads(bytes)
>>> print(func2.func.__module__)
None
>>> print(func2.rdd_wrap_func.__module__)
None
>>>
```
After this patch:
```
>>> from pyspark.streaming import StreamingContext
>>> from pyspark.streaming.util import TransformFunction
>>> ssc = StreamingContext(sc, 1)
>>> func = TransformFunction(sc, lambda x: x, sc.serializer)
>>> func.rdd_wrapper(lambda x: x)
TransformFunction(<function <lambda> at 0x108bf1b90>)
>>> bytes = bytearray(ssc._transformerSerializer.serializer.dumps((func.func, func.rdd_wrap_func, func.deserializers)))
>>> func2 = ssc._transformerSerializer.loads(bytes)
>>> print(func2.func.__module__)
__main__
>>> print(func2.rdd_wrap_func.__module__)
__main__
>>>
```

Author: Shixiong Zhu <[email protected]>

Closes #11535 from zsxwing/loads-module.

(cherry picked from commit ee913e6)
Signed-off-by: Davies Liu <[email protected]>
asfgit pushed a commit that referenced this pull request Mar 6, 2016
…ionSerializer.loads

## What changes were proposed in this pull request?

Set the function's module name to `__main__` if it's missing in `TransformFunctionSerializer.loads`.

## How was this patch tested?

Manually test in the shell.

Before this patch:
```
>>> from pyspark.streaming import StreamingContext
>>> from pyspark.streaming.util import TransformFunction
>>> ssc = StreamingContext(sc, 1)
>>> func = TransformFunction(sc, lambda x: x, sc.serializer)
>>> func.rdd_wrapper(lambda x: x)
TransformFunction(<function <lambda> at 0x106ac8b18>)
>>> bytes = bytearray(ssc._transformerSerializer.serializer.dumps((func.func, func.rdd_wrap_func, func.deserializers)))
>>> func2 = ssc._transformerSerializer.loads(bytes)
>>> print(func2.func.__module__)
None
>>> print(func2.rdd_wrap_func.__module__)
None
>>>
```
After this patch:
```
>>> from pyspark.streaming import StreamingContext
>>> from pyspark.streaming.util import TransformFunction
>>> ssc = StreamingContext(sc, 1)
>>> func = TransformFunction(sc, lambda x: x, sc.serializer)
>>> func.rdd_wrapper(lambda x: x)
TransformFunction(<function <lambda> at 0x108bf1b90>)
>>> bytes = bytearray(ssc._transformerSerializer.serializer.dumps((func.func, func.rdd_wrap_func, func.deserializers)))
>>> func2 = ssc._transformerSerializer.loads(bytes)
>>> print(func2.func.__module__)
__main__
>>> print(func2.rdd_wrap_func.__module__)
__main__
>>>
```

Author: Shixiong Zhu <[email protected]>

Closes #11535 from zsxwing/loads-module.

(cherry picked from commit ee913e6)
Signed-off-by: Davies Liu <[email protected]>
@davies
Copy link
Contributor

davies commented Mar 6, 2016

Also merged into 1.4, 1.5, 1.6 branches.

@asfgit asfgit closed this in ee913e6 Mar 6, 2016
@zsxwing zsxwing deleted the loads-module branch March 6, 2016 22:46
roygao94 pushed a commit to roygao94/spark that referenced this pull request Mar 22, 2016
…ionSerializer.loads

## What changes were proposed in this pull request?

Set the function's module name to `__main__` if it's missing in `TransformFunctionSerializer.loads`.

## How was this patch tested?

Manually test in the shell.

Before this patch:
```
>>> from pyspark.streaming import StreamingContext
>>> from pyspark.streaming.util import TransformFunction
>>> ssc = StreamingContext(sc, 1)
>>> func = TransformFunction(sc, lambda x: x, sc.serializer)
>>> func.rdd_wrapper(lambda x: x)
TransformFunction(<function <lambda> at 0x106ac8b18>)
>>> bytes = bytearray(ssc._transformerSerializer.serializer.dumps((func.func, func.rdd_wrap_func, func.deserializers)))
>>> func2 = ssc._transformerSerializer.loads(bytes)
>>> print(func2.func.__module__)
None
>>> print(func2.rdd_wrap_func.__module__)
None
>>>
```
After this patch:
```
>>> from pyspark.streaming import StreamingContext
>>> from pyspark.streaming.util import TransformFunction
>>> ssc = StreamingContext(sc, 1)
>>> func = TransformFunction(sc, lambda x: x, sc.serializer)
>>> func.rdd_wrapper(lambda x: x)
TransformFunction(<function <lambda> at 0x108bf1b90>)
>>> bytes = bytearray(ssc._transformerSerializer.serializer.dumps((func.func, func.rdd_wrap_func, func.deserializers)))
>>> func2 = ssc._transformerSerializer.loads(bytes)
>>> print(func2.func.__module__)
__main__
>>> print(func2.rdd_wrap_func.__module__)
__main__
>>>
```

Author: Shixiong Zhu <[email protected]>

Closes apache#11535 from zsxwing/loads-module.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants