Skip to content

Commit

Permalink
Clean up docs for PySpark context methods
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Apr 19, 2014
1 parent b20ec7e commit 4e08983
Showing 1 changed file with 21 additions and 34 deletions.
55 changes: 21 additions & 34 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,6 @@ def wholeTextFiles(self, path):
return RDD(self._jsc.wholeTextFiles(path), self,
PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))

###

def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class="org.apache.hadoop.io.Text",
key_wrapper="", value_wrapper="", minSplits=None):
"""
Expand All @@ -308,30 +306,33 @@ def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class=
1. A Java RDD is created from the SequenceFile, key and value classes
2. Serialization is attempted via MsgPack
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
4. C{MsgpackSerializer} is used to deserialize data on the Python side
>>> sc.sequenceFile("test_support/data/sfint/").collect()
[(1, 'aa'), (2, 'bb'), (2, 'aa'), (3, 'cc'), (2, 'bb'), (1, 'aa')]
>>> sc.sequenceFile("test_support/data/sfdouble/").collect()
[(1.0, 'aa'), (2.0, 'bb'), (2.0, 'aa'), (3.0, 'cc'), (2.0, 'bb'), (1.0, 'aa')]
>>> sc.sequenceFile("test_support/data/sftext/").collect()
[('1', 'aa'), ('2', 'bb'), ('2', 'aa'), ('3', 'cc'), ('2', 'bb'), ('1', 'aa')]
>>> sc.sequenceFile("test_support/data/sfbool/").collect()
[(1, True), (2, True), (2, False), (3, True), (2, False), (1, False)]
>>> sc.sequenceFile("test_support/data/sfnull/").collect()
[(1, None), (2, None), (2, None), (3, None), (2, None), (1, None)]
>>> sc.sequenceFile("test_support/data/sfmap/").collect()
[(1, {2.0: 'aa'}), (2, {3.0: 'bb'}), (2, {1.0: 'cc'}), (3, {2.0: 'dd'}), (2, {1.0: 'aa'}), (1, {3.0: 'bb'})]
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, key_class, value_class, key_wrapper, value_wrapper,
minSplits)
return RDD(jrdd, self, MsgpackSerializer())

def newAPIHadoopFile(self, name, inputformat_class, key_class, value_class, key_wrapper="toString",
def newAPIHadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="toString",
value_wrapper="toString", conf={}):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
The mechanism is as follows:
1. A Java RDD is created from the InputFormat, key and value classes
2. Serialization is attempted via MsgPack
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
The mechanism is the same as for sc.sequenceFile.
A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
"""
Expand All @@ -342,16 +343,12 @@ def newAPIHadoopFile(self, name, inputformat_class, key_class, value_class, key_
key_wrapper, value_wrapper, jconf)
return RDD(jrdd, self, MsgpackSerializer())

def newAPIHadoopRDD(self, inputformat_class, key_class, value_class, key_wrapper="toString",
value_wrapper="toString", conf={}):
def newAPIHadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration,
that is passed in as a Python dict. This will be converted into a Configuration in Java.
The mechanism is as follows:
1. A Java RDD is created from the InputFormat, key and value classes
2. Serialization is attempted via MsgPack
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
which is passed in as a Python dict. This will be converted into a Configuration in Java.
The mechanism is the same as for sc.sequenceFile.
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
Expand All @@ -360,16 +357,12 @@ def newAPIHadoopRDD(self, inputformat_class, key_class, value_class, key_wrapper
value_wrapper, jconf)
return RDD(jrdd, self, MsgpackSerializer())

def hadoopFile(self, name, inputformat_class, key_class, value_class, key_wrapper="toString",
value_wrapper="toString", conf={}):
def hadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
The mechanism is as follows:
1. A Java RDD is created from the InputFormat, key and value classes
2. Serialization is attempted via MsgPack
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
The mechanism is the same as for sc.sequenceFile.
A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
"""
Expand All @@ -380,16 +373,12 @@ def hadoopFile(self, name, inputformat_class, key_class, value_class, key_wrappe
value_wrapper, jconf)
return RDD(jrdd, self, MsgpackSerializer())

def hadoopRDD(self, inputformat_class, key_class, value_class, key_wrapper="toString",
value_wrapper="toString", conf={}):
def hadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text",
value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration,
that is passed in as a Python dict. This will be converted into a Configuration in Java.
The mechanism is as follows:
1. A Java RDD is created from the InputFormat, key and value classes
2. Serialization is attempted via MsgPack
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{MsgPackDeserializer} is used to deserialize data on the Python side
which is passed in as a Python dict. This will be converted into a Configuration in Java.
The mechanism is the same as for sc.sequenceFile.
"""
jconf = self._jvm.java.util.HashMap()
for k, v in conf.iteritems():
Expand All @@ -398,8 +387,6 @@ def hadoopRDD(self, inputformat_class, key_class, value_class, key_wrapper="toSt
value_wrapper, jconf)
return RDD(jrdd, self, MsgpackSerializer())

####

def _checkpointFile(self, name, input_deserializer):
jrdd = self._jsc.checkpointFile(name)
return RDD(jrdd, self, input_deserializer)
Expand Down

0 comments on commit 4e08983

Please sign in to comment.