Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-16921][PYSPARK] RDD/DataFrame persist()/cache() should return Python context managers #14579

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,12 @@ def __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSeri
self._id = jrdd.id()
self.partitioner = None

def __enter__(self):
Copy link
Contributor

@MechCoder MechCoder Aug 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it reasonable just to raise an error saying that the context manager is meant to work only with cached RDD's (Dataframes) if self.is_cached is not set to True to solve problems such as the usage of with rdd.map(lambda x: x) as x: ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do it - but users can still then do with rdd.cache().map(...) as x: and it would be valid. So it doesn't fully solve the issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that true? Doesn't it call __enter__ on the instance of rdd.cache().map(...) where is_cached is set to False?

Quick verification:

def __enter__(self)
    if self.is_cached:
        return self
    else:
        raise ValueError("r")

 with rdd.cache().map(lambda x: x) as t:
    pass

raises a ValueError

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats an interesting approach @MechCoder I think that could be a way to clarify how to expect to use the context manager to users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeas, also known as the "If you don't know what to do; raise an Error" approach :p

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, yes this does happen to work, because most operations boil down to something like mapPartitions which creates a new PipelineRDD which is not cached, or a new RDD which is again not cached.

I think it will work for DataFrame too for similar reason - most operations return a new DataFrame instance.

return self

def __exit__(self, *args):
self.unpersist()

def _pickled(self):
return self._reserialize(AutoBatchedSerializer(PickleSerializer()))

Expand Down Expand Up @@ -221,6 +227,21 @@ def context(self):
def cache(self):
"""
Persist this RDD with the default storage level (C{MEMORY_ONLY}).

:py:meth:`cache` can be used in a 'with' statement. The RDD will be automatically
unpersisted once the 'with' block is exited. Note however that any actions on the RDD
that require the RDD to be cached, should be invoked inside the 'with' block; otherwise,
caching will have no effect.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super minor documentation suggestion - but I was thinking maybe a version changed directive could be helpful to call out that its new functionality (both in RDD and DF)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, especially since this is technically a new Public API that we are potentially committing to for the life of the 2.x line.


>>> rdd = sc.parallelize(["b", "a", "c"])
>>> with rdd.cache() as cached:
... print(cached.getStorageLevel())
... print(cached.count())
...
Memory Serialized 1x Replicated
3
>>> print(rdd.getStorageLevel())
Serialized 1x Replicated
"""
self.is_cached = True
self.persist(StorageLevel.MEMORY_ONLY)
Expand All @@ -233,9 +254,22 @@ def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
a new storage level if the RDD does not have a storage level set yet.
If no storage level is specified defaults to (C{MEMORY_ONLY}).

:py:meth:`persist` can be used in a 'with' statement. The RDD will be automatically
unpersisted once the 'with' block is exited. Note however that any actions on the RDD
that require the RDD to be cached, should be invoked inside the 'with' block; otherwise,
caching will have no effect.

>>> rdd = sc.parallelize(["b", "a", "c"])
>>> rdd.persist().is_cached
>>> with rdd.persist() as persisted:
... print(persisted.getStorageLevel())
... print(persisted.is_cached)
... print(persisted.count())
...
Memory Serialized 1x Replicated
True
3
>>> print(rdd.getStorageLevel())
Serialized 1x Replicated
"""
self.is_cached = True
javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel)
Expand Down
26 changes: 26 additions & 0 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ def __init__(self, jdf, sql_ctx):
self._schema = None # initialized lazily
self._lazy_rdd = None

def __enter__(self):
return self

def __exit__(self, *args):
self.unpersist()

@property
@since(1.3)
def rdd(self):
Expand Down Expand Up @@ -390,6 +396,16 @@ def foreachPartition(self, f):
@since(1.3)
def cache(self):
""" Persists with the default storage level (C{MEMORY_ONLY}).

:py:meth:`cache` can be used in a 'with' statement. The DataFrame will be automatically
unpersisted once the 'with' block is exited. Note however that any actions on the DataFrame
that require the DataFrame to be cached, should be invoked inside the 'with' block;
otherwise, caching will have no effect.

>>> with df.cache() as cached:
... print(cached.count())
...
2
"""
self.is_cached = True
self._jdf.cache()
Expand All @@ -401,6 +417,16 @@ def persist(self, storageLevel=StorageLevel.MEMORY_ONLY):
after the first time it is computed. This can only be used to assign
a new storage level if the RDD does not have a storage level set yet.
If no storage level is specified defaults to (C{MEMORY_ONLY}).

:py:meth:`persist` can be used in a 'with' statement. The DataFrame will be automatically
unpersisted once the 'with' block is exited. Note however that any actions on the DataFrame
that require the DataFrame to be cached, should be invoked inside the 'with' block;
otherwise, caching will have no effect.

>>> with df.persist() as persisted:
... print(persisted.count())
...
2
"""
self.is_cached = True
javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel)
Expand Down