Skip to content

Commit

Permalink
Move input format tests to tests.py and clean up docs
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Jun 3, 2014
1 parent 43eb728 commit 085b55f
Show file tree
Hide file tree
Showing 2 changed files with 196 additions and 30 deletions.
92 changes: 62 additions & 30 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,32 +342,21 @@ def sequenceFile(self, path, keyClass, valueClass, keyConverter=None, valueConve
Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI.
The mechanism is as follows:
1. A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes
1. A Java RDD is created from the SequenceFile or other InputFormat, and the key
and value Writable classes
2. Serialization is attempted via Pyrolite pickling
3. If this fails, the fallback is to call 'toString' on each key and value
4. C{PickleSerializer} is used to deserialize pickled objects on the Python side
@param path:
@param keyClass:
@param valueClass:
@param keyWrapper:
@param valueWrapper:
@param minSplits:
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfint/").collect())
[(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfdouble/").collect())
[(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sftext/").collect())
[(u'1', u'aa'), (u'1', u'aa'), (u'2', u'aa'), (u'2', u'bb'), (u'2', u'bb'), (u'3', u'cc')]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfbool/").collect())
[(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfnull/").collect())
[(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)]
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfmap/").collect())
[(1, {2.0: u'aa'}), (1, {3.0: u'bb'}), (2, {1.0: u'aa'}), (2, {1.0: u'cc'}), (2, {3.0: u'bb'}), (3, {2.0: u'dd'})]
>>> r = sorted(sc.sequenceFile(tempdir + "/sftestdata/sfclass").collect())[0]
>>> r == (u'1', {u'__class__': u'org.apache.spark.api.python.TestWritable', u'double': 54.0, u'int': 123, u'str': u'test1'})
True
@param path: path to sequncefile
@param keyClass: fully qualified classname of key Writable class
(e.g. "org.apache.hadoop.io.Text")
@param valueClass: fully qualified classname of value Writable class
(e.g. "org.apache.hadoop.io.LongWritable")
@param keyConverter:
@param valueConverter:
@param minSplits: minimum splits in dataset
(default min(2, sc.defaultParallelism))
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass,
Expand All @@ -383,6 +372,18 @@ def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConv
A Hadoop configuration can be passed in as a Python dict. This will be converted into a
Configuration in Java
@param path: path to Hadoop file
@param inputFormatClass: fully qualified classname of Hadoop InputFormat
(e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat")
@param keyClass: fully qualified classname of key Writable class
(e.g. "org.apache.hadoop.io.Text")
@param valueClass: fully qualified classname of value Writable class
(e.g. "org.apache.hadoop.io.LongWritable")
@param keyConverter: (None by default)
@param valueConverter: (None by default)
@param conf: Hadoop configuration, passed in as a dict
(None by default)
"""
jconf = self._dictToJavaMap(conf)
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass,
Expand All @@ -393,9 +394,20 @@ def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=N
valueConverter=None, conf=None):
"""
Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
Hadoop configuration,
which is passed in as a Python dict. This will be converted into a Configuration in Java.
Hadoop configuration, which is passed in as a Python dict.
This will be converted into a Configuration in Java.
The mechanism is the same as for sc.sequenceFile.
@param inputFormatClass: fully qualified classname of Hadoop InputFormat
(e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat")
@param keyClass: fully qualified classname of key Writable class
(e.g. "org.apache.hadoop.io.Text")
@param valueClass: fully qualified classname of value Writable class
(e.g. "org.apache.hadoop.io.LongWritable")
@param keyConverter: (None by default)
@param valueConverter: (None by default)
@param conf: Hadoop configuration, passed in as a dict
(None by default)
"""
jconf = self._dictToJavaMap(conf)
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass,
Expand All @@ -410,11 +422,21 @@ def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=
The mechanism is the same as for sc.sequenceFile.
A Hadoop configuration can be passed in as a Python dict. This will be converted into a
Configuration in Java
Configuration in Java.
@param path: path to Hadoop file
@param inputFormatClass: fully qualified classname of Hadoop InputFormat
(e.g. "org.apache.hadoop.mapred.TextInputFormat")
@param keyClass: fully qualified classname of key Writable class
(e.g. "org.apache.hadoop.io.Text")
@param valueClass: fully qualified classname of value Writable class
(e.g. "org.apache.hadoop.io.LongWritable")
@param keyConverter: (None by default)
@param valueConverter: (None by default)
@param conf: Hadoop configuration, passed in as a dict
(None by default)
"""
jconf = self._dictToJavaMap(conf)
for k, v in conf.iteritems():
jconf[k] = v
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass,
valueClass, keyConverter, valueConverter, jconf)
return RDD(jrdd, self, PickleSerializer())
Expand All @@ -423,9 +445,20 @@ def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None,
valueConverter=None, conf=None):
"""
Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary
Hadoop configuration,
which is passed in as a Python dict. This will be converted into a Configuration in Java.
Hadoop configuration, which is passed in as a Python dict.
This will be converted into a Configuration in Java.
The mechanism is the same as for sc.sequenceFile.
@param inputFormatClass: fully qualified classname of Hadoop InputFormat
(e.g. "org.apache.hadoop.mapred.TextInputFormat")
@param keyClass: fully qualified classname of key Writable class
(e.g. "org.apache.hadoop.io.Text")
@param valueClass: fully qualified classname of value Writable class
(e.g. "org.apache.hadoop.io.LongWritable")
@param keyConverter: (None by default)
@param valueConverter: (None by default)
@param conf: Hadoop configuration, passed in as a dict
(None by default)
"""
jconf = self._dictToJavaMap(conf)
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass,
Expand Down Expand Up @@ -674,7 +707,6 @@ def _test():
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
globs['tempdir'] = tempfile.mkdtemp()
globs['sc']._jvm.WriteInputFormatTestDataGenerator.generateData(globs['tempdir'], globs['sc']._jsc)
atexit.register(lambda: shutil.rmtree(globs['tempdir']))
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
Expand Down
134 changes: 134 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,140 @@ def func(x):
self.sc.parallelize([1]).foreach(func)


class TestInputFormat(PySparkTestCase):

def setUp(self):
PySparkTestCase.setUp(self)
self.tempdir = tempfile.NamedTemporaryFile(delete=False)
os.unlink(self.tempdir.name)
self.sc._jvm.WriteInputFormatTestDataGenerator.generateData(self.tempdir.name, self.sc._jsc)

def tearDown(self):
PySparkTestCase.tearDown(self)
shutil.rmtree(self.tempdir.name)

def test_sequencefiles(self):
basepath = self.tempdir.name
ints = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfint/",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.Text").collect())
ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
self.assertEqual(ints, ei)

doubles = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfdouble/",
"org.apache.hadoop.io.DoubleWritable",
"org.apache.hadoop.io.Text").collect())
ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
self.assertEqual(doubles, ed)

text = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sftext/",
"org.apache.hadoop.io.Text",
"org.apache.hadoop.io.Text").collect())
et = [(u'1', u'aa'),
(u'1', u'aa'),
(u'2', u'aa'),
(u'2', u'bb'),
(u'2', u'bb'),
(u'3', u'cc')]
self.assertEqual(text, et)

bools = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbool/",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.BooleanWritable").collect())
eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)]
self.assertEqual(bools, eb)

nulls = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfnull/",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.BooleanWritable").collect())
en = [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)]
self.assertEqual(nulls, en)

maps = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfmap/",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.MapWritable").collect())
em = [(1, {2.0: u'aa'}),
(1, {3.0: u'bb'}),
(2, {1.0: u'aa'}),
(2, {1.0: u'cc'}),
(2, {3.0: u'bb'}),
(3, {2.0: u'dd'})]
self.assertEqual(maps, em)

clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/",
"org.apache.hadoop.io.Text",
"org.apache.spark.api.python.TestWritable").collect())
ec = (u'1',
{u'__class__': u'org.apache.spark.api.python.TestWritable',
u'double': 54.0, u'int': 123, u'str': u'test1'})
self.assertEqual(clazz[0], ec)

def test_oldhadoop(self):
basepath = self.tempdir.name
ints = sorted(self.sc.hadoopFile(basepath + "/sftestdata/sfint/",
"org.apache.hadoop.mapred.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.Text").collect())
ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
self.assertEqual(ints, ei)

hello = self.sc.hadoopFile("python/test_support/hello.txt",
"org.apache.hadoop.mapred.TextInputFormat",
"org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text").collect()
result = [(0, u'Hello World!')]
self.assertEqual(hello, result)

def test_newhadoop(self):
basepath = self.tempdir.name
ints = sorted(self.sc.newAPIHadoopFile(
basepath + "/sftestdata/sfint/",
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.Text").collect())
ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
self.assertEqual(ints, ei)

hello = self.sc.newAPIHadoopFile("python/test_support/hello.txt",
"org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
"org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text").collect()
result = [(0, u'Hello World!')]
self.assertEqual(hello, result)

def test_newolderror(self):
basepath = self.tempdir.name
newFromOld = self.sc.hadoopFile(
basepath + "/sftestdata/sfint/",
"org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.Text")
self.assertRaises(Exception, lambda: newFromOld.collect())

oldFromNew = self.sc.newAPIHadoopFile(basepath + "/sftestdata/sfint/",
"org.apache.hadoop.mapred.SequenceFileInputFormat",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.Text")
self.assertRaises(Exception, lambda: oldFromNew.collect())

def test_bad_inputs(self):
basepath = self.tempdir.name
self.assertRaises(Exception, lambda: self.sc.sequenceFile(
basepath + "/sftestdata/sfint/",
"org.apache.hadoop.io.NotValidWritable",
"org.apache.hadoop.io.Text"))
self.assertRaises(Exception, lambda: self.sc.hadoopFile(
basepath + "/sftestdata/sfint/",
"org.apache.hadoop.mapred.NotValidInputFormat",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.Text"))
self.assertRaises(Exception, lambda: self.sc.newAPIHadoopFile(
basepath + "/sftestdata/sfint/",
"org.apache.hadoop.mapreduce.lib.input.NotValidInputFormat",
"org.apache.hadoop.io.IntWritable",
"org.apache.hadoop.io.Text"))


class TestDaemon(unittest.TestCase):
def connect(self, port):
from socket import socket, AF_INET, SOCK_STREAM
Expand Down

0 comments on commit 085b55f

Please sign in to comment.