Skip to content

Commit

Permalink
Add back context.py changes
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Apr 23, 2014
1 parent 9ef1896 commit 93ef995
Showing 1 changed file with 93 additions and 0 deletions.
93 changes: 93 additions & 0 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,98 @@ def wholeTextFiles(self, path):
return RDD(self._jsc.wholeTextFiles(path), self,
PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))

def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class="org.apache.hadoop.io.Text",
key_wrapper="", value_wrapper="", minSplits=None):
"""
Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
The mechanism is as follows:
1. A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes
2. Serialization is attempted via Pyrolite pickling
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{PickleSerializer} is used to deserialize pickled objects on the Python side
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfint/").collect())
[(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfdouble/").collect())
[(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sftext/").collect())
[(u'1', u'aa'), (u'1', u'aa'), (u'2', u'aa'), (u'2', u'bb'), (u'2', u'bb'), (u'3', u'cc')]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfbool/").collect())
[(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfnull/").collect())
[(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfmap/").collect())
[(1, {2.0: u'aa'}), (1, {3.0: u'bb'}), (2, {1.0: u'aa'}), (2, {1.0: u'cc'}), (2, {3.0: u'bb'}), (3, {2.0: u'dd'})]
>>> r = sorted(sc.sequenceFile(tempdir + "/sftestdata/sfclass").collect())[0]
>>> r == (u'1', {u'__class__': u'org.apache.spark.api.python.TestWritable', u'double': 54.0, u'int': 123, u'str': u'test1'})
True
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, key_class, value_class, key_wrapper, value_wrapper,
minSplits)
return RDD(jrdd, self, PickleSerializer())

def newAPIHadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="toString",
value_wrapper="toString", conf={}):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
The mechanism is the same as for sc.sequenceFile.
A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, name, inputformat_class, key_class, value_class,
key_wrapper, value_wrapper, jconf)
return RDD(jrdd, self, PickleSerializer())

def newAPIHadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration,
which is passed in as a Python dict. This will be converted into a Configuration in Java.
The mechanism is the same as for sc.sequenceFile.
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper,
value_wrapper, jconf)
return RDD(jrdd, self, PickleSerializer())

def hadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
The mechanism is the same as for sc.sequenceFile.
A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, name, inputformat_class, key_class, value_class, key_wrapper,
value_wrapper, jconf)
return RDD(jrdd, self, PickleSerializer())

def hadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration,
which is passed in as a Python dict. This will be converted into a Configuration in Java.
The mechanism is the same as for sc.sequenceFile.
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper,
value_wrapper, jconf)
return RDD(jrdd, self, PickleSerializer())

def _checkpointFile(self, name, input_deserializer):
jrdd = self._jsc.checkpointFile(name)
return RDD(jrdd, self, input_deserializer)
Expand Down Expand Up @@ -467,6 +559,7 @@ def _test():
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
globs['tempdir'] = tempfile.mkdtemp()
globs['sc']._jvm.WriteInputFormatTestDataGenerator.generateData(globs['tempdir'], globs['sc']._jsc)
atexit.register(lambda: shutil.rmtree(globs['tempdir']))
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
Expand Down

0 comments on commit 93ef995

Please sign in to comment.