From 29245bf3427621e5a329cc54494af0bb1e88d888 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Sat, 12 Apr 2014 12:45:32 -0700 Subject: [PATCH] Cache underlying SchemaRDD instead of generating and caching PythonRDD --- python/pyspark/sql.py | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 094cec6bbee79..67e6eee3f4bd1 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -309,6 +309,37 @@ def _toPython(self): # pickle serializer in Pyrolite return RDD(jrdd, self._sc, self._sc.serializer).map(lambda d: Row(d)) + # We override the default cache/persist/checkpoint behavior as we want to cache the underlying + # SchemaRDD object in the JVM, not the PythonRDD checkpointed by the super class + def cache(self): + self.is_cached = True + self._jschema_rdd.cache() + return self + + def persist(self, storageLevel): + self.is_cached = True + javaStorageLevel = self.ctx._getJavaStorageLevel(storageLevel) + self._jschema_rdd.persist(javaStorageLevel) + return self + + def unpersist(self): + self.is_cached = False + self._jschema_rdd.unpersist() + return self + + def checkpoint(self): + self.is_checkpointed = True + self._jschema_rdd.checkpoint() + + def isCheckpointed(self): + return self._jschema_rdd.isCheckpointed() + + def getCheckpointFile(self): + checkpointFile = self._jschema_rdd.getCheckpointFile() + if checkpointFile.isDefined(): + return checkpointFile.get() + else: + return None def _test(): import doctest