Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-16921][PYSPARK] RDD/DataFrame persist()/cache() should return Python context managers #14579

Closed
wants to merge 1 commit into from

Conversation

MLnick
Copy link
Contributor

@MLnick MLnick commented Aug 10, 2016

JIRA: https://issues.apache.org/jira/browse/SPARK-16921

Context managers are a natural way to capture closely related setup and teardown code in Python. It can be useful to apply this pattern to persisting/unpersisting RDDs and DataFrames.

This PR makes RDDs and DataFrames implement the context manager __enter__ and __exit__ functions, allowing code such as:

with labeled_data.persist():
    model = pipeline.fit(labeled_data)

How was this patch tested?

New doc tests.

@SparkQA
Copy link

SparkQA commented Aug 10, 2016

Test build #63520 has finished for PR 14579 at commit 2b4e56e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MLnick
Copy link
Contributor Author

MLnick commented Aug 10, 2016

cc @nchammas @holdenk @rxin

Note:
This is implemented by adding the __enter__ and __exit__ methods to RDD/DataFrame directly. This allows some potentially weird stuff, such as any instance of RDD/DF, including any method returning self, can be used in a with statement, e.g. this works:

with rdd.map(lambda x: x) as x:
    ...

Clearly this doesn't make a lot of sense. However, I looked at the 2 options of (a) a separate context manager wrapper class returned by persist; and (b) trying to dynamically add the methods (or at least __enter__) in persist.

The problem with (a) is that persist needs to return an RDD/DF instance, so this breaks chaining behavior such as rdd.cache().count() etc.

The problem with (b) is that the special method __enter__ is called in the context of with as type(rdd).__enter__(rdd) (see PEP 343). So it does not help to add a method dynamically to an instance, it must be done to the class. In this case, then after the first with statement usage, all existing and future instances of RDD/DF have the __enter__ method, putting us in the same situation as the approach in this PR of having the methods defined on the class (with associated allowed "weirdness").

So, if we want to avoid that, the only option I see is a variant of (a) above - adding a cached/persisted method that returns a context manager, so it would look like this:

with cached(rdd) as x:
    x.count()

This is less "elegant" but more explicit.

Any other smart ideas for handling option (b) above, please do shout!

@nchammas
Copy link
Contributor

Thanks @MLnick for taking this on and for breaking down what you've found so far.

I took a look through contextlib for inspiration, and I wonder if the source code for closing() offers a template we can follow that would let persist() return an RDD/DataFrame instance with the correct magic methods, without having to modify the class.

Have you taken a look at that?

@MLnick
Copy link
Contributor Author

MLnick commented Aug 10, 2016

@nchammas I looked at the @contextmanager decorator. It is an easy way to create a method that returns a context manager, but is is essentially only usable in a with statement as it returns a contextlib.GeneratorContextManager. For this use case it does not solve the issue that we need to return the RDD/DF instance from cache.

As far as I can see for the closing helper function, it is just a context manager itself, so the same as the final variant for option (a) mentioned above, i.e. with cached(rdd) as ...

@nchammas
Copy link
Contributor

nchammas commented Aug 10, 2016

Ah, you're right.

So if we want to avoid needing magic methods in the main RDD/DataFrame classes and avoid needing a separate utility method like cached(), I think one option available to us is to have separate PersistedRDD and PersistedDataFrame classes that simply wrap the base RDD and DataFrames classes and add the appropriate magic methods.

.persist() and .cache() would then return instances of these classes, which should satisfy the type(x).__enter__(x) behavior while still preserving backwards compatibility and method chaining.

What do you think of that?

@nchammas
Copy link
Contributor

None of our options seems great, but if I had to rank them I would say:

  1. Add new Persisted... classes.
  2. Make no changes.
  3. Add separate persisted() or cached() utility method.
  4. Modify base RDD and DataFrame classes.

Adding new internal classes for this use-case honestly seems a bit heavy-handed to me, so if we are against that then I would lean towards not doing anything.

@rxin
Copy link
Contributor

rxin commented Aug 10, 2016

cc @davies

@holdenk
Copy link
Contributor

holdenk commented Aug 10, 2016

One minor thing to keep in mind - the subclassing of RDD approach could cause us to miss out on pipelining if the RDD was used again after it was unpersisted - but I think that is a relatively minor issue.

On the whole I think modify base rdd and dataframe classes (option A / option 4) which is the one @MLnick has implemented here is probably one of the more reasonable options - the with statement doesn't add anything if the RDD/DataFrame isn't persisted but can do cleanup if it is.

But if there is a better way to do this I'd be excited to find out as well :)

@nchammas
Copy link
Contributor

the subclassing of RDD approach could cause us to miss out on pipelining if the RDD was used again after it was unpersisted

How so? Wouldn't __exit__() simply return the parent RDD or DataFrame object?

@holdenk
Copy link
Contributor

holdenk commented Aug 10, 2016

@nchammas so if we go with the subclassing approach but keep the current cache/persist interface (e.g. no special utility function) a user could easily write something like:
magic = rdd.persist() with magic as awesome: awesome.count() magic.map(lambda x: x + 1)

I don't believe __exit__() could easily return a result that would updated magic to be rdd (infact __exit__() generally doesn't seem to return a result - instead its expected to do the teardown logic internally).

:py:meth:`cache` can be used in a 'with' statement. The RDD will be automatically
unpersisted once the 'with' block is exited. Note however that any actions on the RDD
that require the RDD to be cached, should be invoked inside the 'with' block; otherwise,
caching will have no effect.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super minor documentation suggestion - but I was thinking maybe a version changed directive could be helpful to call out that its new functionality (both in RDD and DF)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, especially since this is technically a new Public API that we are potentially committing to for the life of the 2.x line.

@nchammas
Copy link
Contributor

Sorry, you're right, __exit__()'s return value is not going to be consumed anywhere. What I meant is that unpersist() would return the base RDD or DataFrame object.

But I'm not seeing the issue with the example you posted. Reformatting for clarity:

magic = rdd.persist()

with magic as awesome:
    awesome.count()

magic.map(lambda x: x + 1)

Are you saying magic.map() will error? Why would it?

magic would be an instance of PersistedRDD, which in turn is a subclass of RDD, which has map() and all of the usual methods defined, plus the magic methods we need for the context manager.

@MLnick
Copy link
Contributor Author

MLnick commented Aug 10, 2016

yeah it would break pipelining - I don't think it will necessarily throw an error though.

e.g.

In [22]: rdd = sc.parallelize(["b", "a", "c"])

In [23]: type(rdd)
Out[23]: pyspark.rdd.RDD

In [24]: mapped = rdd.map(lambda x: x)

In [25]: type(mapped)
Out[25]: pyspark.rdd.PipelinedRDD

In [26]: mapped._is_pipelinable()
Out[26]: True

In [27]: p = mapped.cache()

In [28]: type(p)
Out[28]: pyspark.rdd.PersistedRDD

In [29]: p._is_pipelinable()
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-29-02496125eccd> in <module>()
----> 1 p._is_pipelinable()

AttributeError: 'PersistedRDD' object has no attribute '_is_pipelinable'

In [30]: mapped2 = p.map(lambda x: x)

In [31]: type(mapped2)
Out[31]: pyspark.rdd.PipelinedRDD

So I think chaining will work, but the pipelined RDD thinks mapped2 is the 1st transformation, while it is actually the 2nd. I think this will just be an efficiency issue rather than a correctness issue however.

We could possibly work around it with some type checking etc but it then starts to feel like adding more complexity than the feature is worth...

@nchammas
Copy link
Contributor

Ah, I see. I don't fully understand how PipelinedRDD works or how it is used so I'll have to defer to y'all on this. Does the cached() utility method have this same problem?

We could possibly work around it with some type checking etc but it then starts to feel like adding more complexity than the feature is worth...

Agreed.

At this point, actually, I'm beginning to feel this feature is not worth it.

Context managers seem to work best when the objects they're working on have clear open/close-style semantics. File handles, network connections, and the like fit this pattern well.

In fact, the doc for with says:

This allows common try...except...finally usage patterns to be encapsulated for convenient reuse.

RDDs and DataFrames, on the other hand, don't have a simple open/close or try...except...finally pattern. And when we try to map one onto persist and unpersist, we get the various side-effects we've been discussing here.

@holdenk
Copy link
Contributor

holdenk commented Aug 11, 2016

Right I wouldn't expect it to error with subclassing - just not pipeline successfully - but only in a very long shot corner case.

I think the try/finally with persistance is not an uncommon pattern (we have something similar happen frequently inside of Spark ML/mllib but its in Scala code).

@MLnick
Copy link
Contributor Author

MLnick commented Aug 11, 2016

@nchammas a utility method (e.g. cached) - actually it will be a context manager class implemented in the same way as closing - will work because it only needs to return the context manager, not the RDD instance. So there is no chaining requirement, and it will only work in a with statement.

@MLnick
Copy link
Contributor Author

MLnick commented Aug 11, 2016

After looking at it and considering all the above, I would say the options are (1) do nothing; or (2) if we want to support this use case, then we implement a single utility method (I would say called persisted) that is a context manager.

Even though we could almost achieve things with subclassing, we do sort of break something and add too much risk/complexity vs reward of the feature IMHO.

@nchammas
Copy link
Contributor

So there is no chaining requirement, and it will only work in a with statement.

@MLnick - Couldn't we also create a scenario (like @holdenk did earlier) where a user does something like this?

persisted_rdd = persisted(rdd)
persisted_rdd.map(...).filter(...).count()

This would break pipelining too, no?

And I think the expectation would be for it not to break pipelining, because existing common context managers in Python don't have a requirement that they must be used in a with block.

For example, f = open(file) works fine, as does s = requests.Session(), and the resulting objects have the same behavior as they would inside a with block.

@holdenk
Copy link
Contributor

holdenk commented Aug 11, 2016

@nchammas to be clear - subclassing only breaks pipelining if the persisted_rdd is later unpersisted (e.g. used with a with statement or otherwise) - otherwise you can't pipeline on top of a persisted rdd anyways (which is why I say its a corner case).

@nchammas
Copy link
Contributor

Hmm, OK I see. (Apologies, I don't understand what pipelined RDDs are for, so the examples are going a bit over my head. 😅)

@holdenk
Copy link
Contributor

holdenk commented Aug 11, 2016

Sure - so at a bit of a high level and not like exactly on point - copying data out of the Python back to the JVM is kind of slow so if we have multiple python operations we can put together in the same task then we try and do this. Since caching is handled by storing the data inside of the JVM a cached RDD can't be pipelined since we need to copy the result to the JVM. You can see the details in the PipelinedRDD in rdd.py.

@nchammas
Copy link
Contributor

Thanks for the quick overview. That's pretty straightforward, actually! I'll take a look at PipelinedRDD for the details. 👍

@MLnick
Copy link
Contributor Author

MLnick commented Aug 11, 2016

@nchammas to answer your question above (#14579 (comment)) - in short no.

The semantics of the utility method will be the same as for the closing example. persisted will return a wrapper class that implements the context manager methods. When entering the with statement, the __enter__ method is called, which returns the underlying rdd instance - this is in turn bound to the variable following as. So it would look something like this:

class persisted():
    def __init__(self, thing):
        self.thing = thing
    def __enter__(self):
        return self.thing
    def __exit__(self, *exc_info):
        self.thing.unpersist()

If someone tries to do persisted(rdd).map(...) it will throw an AttributeError.

The "file-like" version is what is currently implemented in this PR, and it works if __enter__ returns self which is what file et al do. Of course those classes seem to not tend to have other methods that return self so don't suffer the same chaining issue we run into with RDD.

@@ -188,6 +188,12 @@ def __init__(self, jrdd, ctx, jrdd_deserializer=AutoBatchedSerializer(PickleSeri
self._id = jrdd.id()
self.partitioner = None

def __enter__(self):
Copy link
Contributor

@MechCoder MechCoder Aug 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it reasonable just to raise an error saying that the context manager is meant to work only with cached RDD's (Dataframes) if self.is_cached is not set to True to solve problems such as the usage of with rdd.map(lambda x: x) as x: ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do it - but users can still then do with rdd.cache().map(...) as x: and it would be valid. So it doesn't fully solve the issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that true? Doesn't it call __enter__ on the instance of rdd.cache().map(...) where is_cached is set to False?

Quick verification:

def __enter__(self)
    if self.is_cached:
        return self
    else:
        raise ValueError("r")

 with rdd.cache().map(lambda x: x) as t:
    pass

raises a ValueError

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats an interesting approach @MechCoder I think that could be a way to clarify how to expect to use the context manager to users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeas, also known as the "If you don't know what to do; raise an Error" approach :p

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, yes this does happen to work, because most operations boil down to something like mapPartitions which creates a new PipelineRDD which is not cached, or a new RDD which is again not cached.

I think it will work for DataFrame too for similar reason - most operations return a new DataFrame instance.

@MLnick
Copy link
Contributor Author

MLnick commented Aug 25, 2016

@nchammas @holdenk @davies @rxin how about the approach of @MechCoder in #14579 (comment)?

I think this will work well, so we could raise an error to prevent (almost all I think) usages outside of the intended pattern of with some_rdd.cache() as x: or with some_rdd_already_cached as x:

@holdenk
Copy link
Contributor

holdenk commented Aug 25, 2016

I like it personally - if no one has a good reason why not it seems like a very reasonable approach.

@nchammas
Copy link
Contributor

Looks good to me. 👍

@holdenk
Copy link
Contributor

holdenk commented Oct 1, 2016

@MLnick still interested in updating this? (Just looking over the older Python PRs) :)

@MLnick
Copy link
Contributor Author

MLnick commented Oct 3, 2016

Yup! just been snowed under :( but will update with the approach above asap.

@holdenk
Copy link
Contributor

holdenk commented Nov 1, 2016

Just pinging to see how its going?

@MLnick
Copy link
Contributor Author

MLnick commented Nov 2, 2016

I hope to get to this soon - It's just the test cases that I need to get to also!

@holdenk
Copy link
Contributor

holdenk commented Nov 26, 2016

just a gentle ping - would be cool to add this for 2.1 we have the time :)

@HyukjinKwon
Copy link
Member

Is there any reason why it is not merged yet? I personally like this too.

@holdenk
Copy link
Contributor

holdenk commented Feb 24, 2017

Do you have time to update this @MLnick or maybe would it be OK if someone else made an updated PR based on this? It would be a nice feature to have for 2.2 :)

@HyukjinKwon
Copy link
Member

gentle ping.

@ueshin
Copy link
Member

ueshin commented Jun 20, 2017

@MLnick Hi, are you still working on this? If so, could you fix conflicts and update please?

@holdenk
Copy link
Contributor

holdenk commented Jul 2, 2017

@MLnick - or if you don't have a chance would it be ok for us to find someone (perhaps someone new to the project) to take this over and bring it to the finish line?

@MLnick
Copy link
Contributor Author

MLnick commented Aug 2, 2017

Hey all - sorry I haven't been able to focus on this. It shouldn't be tough to do, but it will need some tests. If we can find someone who wants to take it over I think it makes a decent starter task :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants