Skip to content

Commit

Permalink
Remove default args for key/value classes. Arg names to camelCase
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Jun 3, 2014
1 parent 9fe6bd5 commit 15a7d07
Showing 1 changed file with 45 additions and 35 deletions.
80 changes: 45 additions & 35 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,14 @@ def wholeTextFiles(self, path, minPartitions=None):
return RDD(self._jsc.wholeTextFiles(path, minPartitions), self,
PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))

def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class="org.apache.hadoop.io.Text",
key_wrapper="", value_wrapper="", minSplits=None):
def dictToJavaMap(self, d):
jm = self._jvm.java.util.HashMap()
for k, v in d.iteritems():
jm[k] = v
return jm

def sequenceFile(self, path, keyClass, valueClass, keyConverter="", valueConverter="",
minSplits=None):
"""
Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
Expand All @@ -338,6 +344,13 @@ def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class=
2. Serialization is attempted via Pyrolite pickling
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{PickleSerializer} is used to deserialize pickled objects on the Python side
@param path:
@param keyClass:
@param valueClass:
@param keyWrapper:
@param valueWrapper:
@param minSplits:
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfint/").collect())
[(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfdouble/").collect())
Expand All @@ -355,69 +368,66 @@ def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class=
True
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, key_class, value_class, key_wrapper, value_wrapper,
minSplits)
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass,
keyConverter, valueConverter, minSplits)
return RDD(jrdd, self, PickleSerializer())

def newAPIHadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="toString",
value_wrapper="toString", conf={}):
def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass,
keyConverter="", valueConverter="", conf={}):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
The mechanism is the same as for sc.sequenceFile.
A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
A Hadoop configuration can be passed in as a Python dict. This will be converted into a
Configuration in Java
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, name, inputformat_class, key_class, value_class,
key_wrapper, value_wrapper, jconf)
jconf = self.dictToJavaMap(conf)
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter, jconf)
return RDD(jrdd, self, PickleSerializer())

def newAPIHadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}):
def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass,
keyConverter="", valueConverter="", conf={}):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration,
Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
Hadoop configuration,
which is passed in as a Python dict. This will be converted into a Configuration in Java.
The mechanism is the same as for sc.sequenceFile.
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper,
value_wrapper, jconf)
jconf = self.dictToJavaMap(conf)
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter, jconf)
return RDD(jrdd, self, PickleSerializer())

def hadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}):
def hadoopFile(self, path, inputFormatClass, keyClass, valueClass,
keyConverter="", valueConverter="", conf={}):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
The mechanism is the same as for sc.sequenceFile.
A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
A Hadoop configuration can be passed in as a Python dict. This will be converted into a
Configuration in Java
"""
jconf = self._jvm.java.util.HashMap()
jconf = self.dictToJavaMap(conf)
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, name, inputformat_class, key_class, value_class, key_wrapper,
value_wrapper, jconf)
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter, jconf)
return RDD(jrdd, self, PickleSerializer())

def hadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}):
def hadoopRDD(self, inputFormatClass, keyClass, valueClass,
keyConverter="", valueConverter="", conf={}):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration,
Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
Hadoop configuration,
which is passed in as a Python dict. This will be converted into a Configuration in Java.
The mechanism is the same as for sc.sequenceFile.
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper,
value_wrapper, jconf)
jconf = self.dictToJavaMap(conf)
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass,
keyConverter, valueConverter, jconf)
return RDD(jrdd, self, PickleSerializer())

def _checkpointFile(self, name, input_deserializer):
Expand Down

0 comments on commit 15a7d07

Please sign in to comment.