Skip to content

Commit

Permalink
Add old Hadoop api methods. Clean up and expand comments. Clean up ar…
Browse files Browse the repository at this point in the history
…gument names
  • Loading branch information
MLnick committed Dec 19, 2013
1 parent 818a1e6 commit 4294cbb
Showing 1 changed file with 74 additions and 26 deletions.
100 changes: 74 additions & 26 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,56 +214,104 @@ def textFile(self, name, minSplits=None):
MUTF8Deserializer())

###
def sequenceFile(self, name, keyClass="org.apache.hadoop.io.Text", valueClass="org.apache.hadoop.io.Text",
keyWrapper="", valueWrapper="", minSplits=None):
def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class="org.apache.hadoop.io.Text",
key_wrapper="", value_wrapper="", minSplits=None):
"""
Read a Hadoopp SequenceFile with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI,
and return it as an RDD of (String, String) where the key and value representations
are generated using the 'toString()' method of the relevant Java class.
Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
The mechanism is as follows:
1. A Java RDD is created from the SequenceFile, key and value classes
2. Serialization is attempted via MsgPack
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
>>> sc.sequenceFile("test_support/data/sfint/").collect()
[(1, 'aa'), (2, 'bb'), (2, 'aa'), (3, 'cc'), (2, 'bb'), (1, 'aa')]
>>> sc.sequenceFile("test_support/data/sfdouble/").collect()
[(1.0, 'aa'), (2.0, 'bb'), (2.0, 'aa'), (3.0, 'cc'), (2.0, 'bb'), (1.0, 'aa')]
>>> sc.sequenceFile("test_support/data/sftext/").collect()
[('1', 'aa'), ('2', 'bb'), ('2', 'aa'), ('3', 'cc'), ('2', 'bb'), ('1', 'aa')]
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, keyClass, valueClass, keyWrapper, valueWrapper,
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, key_class, value_class, key_wrapper, value_wrapper,
minSplits)
#jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, keyWrapper, valueWrapper, minSplits)
return RDD(jrdd, self, MsgPackDeserializer()) # MsgPackDeserializer PairMUTF8Deserializer
return RDD(jrdd, self, MsgPackDeserializer())

def newAPIHadoopFile(self, name, inputFormat, keyClass, valueClass, keyWrapper="toString", valueWrapper="toString",
conf = {}):
def newAPIHadoopFile(self, name, inputformat_class, key_class, value_class, key_wrapper="toString",
value_wrapper="toString", conf={}):
"""
Read a Hadoopp file with arbitrary InputFormat, key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI,
and return it as an RDD of (String, String), where the key and value representations
are generated using the 'toString()' method of the relevant Java class.
Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
The mechanism is as follows:
1. A Java RDD is created from the InputFormat, key and value classes
2. Serialization is attempted via MsgPack
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, name, inputFormat, keyClass, valueClass, keyWrapper,
valueWrapper, jconf)
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, name, inputformat_class, key_class, value_class,
key_wrapper, value_wrapper, jconf)
return RDD(jrdd, self, MsgPackDeserializer())

def newAPIHadoopRDD(self, inputFormat, keyClass, valueClass, keyWrapper="toString", valueWrapper="toString",
conf = {}):
def newAPIHadoopRDD(self, inputformat_class, key_class, value_class, key_wrapper="toString",
value_wrapper="toString", conf={}):
"""
Read a Hadoopp file with arbitrary InputFormat, key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI,
and return it as an RDD of (String, String), where the key and value representations
are generated using the 'toString()' method of the relevant Java class.
Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration,
that is passed in as a Python dict. This will be converted into a Configuration in Java.
The mechanism is as follows:
1. A Java RDD is created from the InputFormat, key and value classes
2. Serialization is attempted via MsgPack
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormat, keyClass, valueClass, keyWrapper,
valueWrapper, jconf)
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper,
value_wrapper, jconf)
return RDD(jrdd, self, MsgPackDeserializer())

###
def hadoopFile(self, name, inputformat_class, key_class, value_class, key_wrapper="toString",
value_wrapper="toString", conf={}):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
The mechanism is as follows:
1. A Java RDD is created from the InputFormat, key and value classes
2. Serialization is attempted via MsgPack
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, name, inputformat_class, key_class, value_class, key_wrapper,
value_wrapper, jconf)
return RDD(jrdd, self, MsgPackDeserializer())

def hadoopRDD(self, inputformat_class, key_class, value_class, key_wrapper="toString",
value_wrapper="toString", conf={}):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration,
that is passed in as a Python dict. This will be converted into a Configuration in Java.
The mechanism is as follows:
1. A Java RDD is created from the InputFormat, key and value classes
2. Serialization is attempted via MsgPack
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper,
value_wrapper, jconf)
return RDD(jrdd, self, MsgPackDeserializer())

def _checkpointFile(self, name, input_deserializer):
jrdd = self._jsc.checkpointFile(name)
Expand Down

0 comments on commit 4294cbb

Please sign in to comment.