Skip to content

Commit

Permalink
Spark-1163, Added missing Python RDD functions
Browse files Browse the repository at this point in the history
Author: prabinb <[email protected]>

Closes apache#92 from prabinb/python-api-rdd and squashes the following commits:

51129ca [prabinb] Added missing Python RDD functions Added __repr__ function to StorageLevel class. Added doctest for RDD.getStorageLevel().
  • Loading branch information
prabinb authored and pwendell committed Mar 12, 2014
1 parent 2409af9 commit af7f2f1
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
42 changes: 42 additions & 0 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
python_right_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
from pyspark.rddsampler import RDDSampler
from pyspark.storagelevel import StorageLevel

from py4j.java_collections import ListConverter, MapConverter

Expand Down Expand Up @@ -1119,6 +1120,47 @@ def zip(self, other):
other._jrdd_deserializer)
return RDD(pairRDD, self.ctx, deserializer)

def name(self):
"""
Return the name of this RDD.
"""
name_ = self._jrdd.name()
if not name_:
return None
return name_.encode('utf-8')

def setName(self, name):
"""
Assign a name to this RDD.
>>> rdd1 = sc.parallelize([1,2])
>>> rdd1.setName('RDD1')
>>> rdd1.name()
'RDD1'
"""
self._jrdd.setName(name)

def toDebugString(self):
"""
A description of this RDD and its recursive dependencies for debugging.
"""
debug_string = self._jrdd.toDebugString()
if not debug_string:
return None
return debug_string.encode('utf-8')

def getStorageLevel(self):
"""
Get the RDD's current storage level.
>>> rdd1 = sc.parallelize([1,2])
>>> rdd1.getStorageLevel()
StorageLevel(False, False, False, 1)
"""
java_storage_level = self._jrdd.getStorageLevel()
storage_level = StorageLevel(java_storage_level.useDisk(),
java_storage_level.useMemory(),
java_storage_level.deserialized(),
java_storage_level.replication())
return storage_level

# TODO: `lookup` is disabled because we can't make direct comparisons based
# on the key; we need to compare the hash of the key to the hash of the
Expand Down
4 changes: 4 additions & 0 deletions python/pyspark/storagelevel.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ def __init__(self, useDisk, useMemory, deserialized, replication = 1):
self.deserialized = deserialized
self.replication = replication

def __repr__(self):
return "StorageLevel(%s, %s, %s, %s)" % (
self.useDisk, self.useMemory, self.deserialized, self.replication)

StorageLevel.DISK_ONLY = StorageLevel(True, False, False)
StorageLevel.DISK_ONLY_2 = StorageLevel(True, False, False, 2)
StorageLevel.MEMORY_ONLY = StorageLevel(False, True, True)
Expand Down

0 comments on commit af7f2f1

Please sign in to comment.