diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 2f227c0ba3b66..4a93222144453 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -342,32 +342,21 @@ def sequenceFile(self, path, keyClass, valueClass, keyConverter=None, valueConve Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. The mechanism is as follows: - 1. A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes + 1. A Java RDD is created from the SequenceFile or other InputFormat, and the key + and value Writable classes 2. Serialization is attempted via Pyrolite pickling 3. If this fails, the fallback is to call 'toString' on each key and value 4. C{PickleSerializer} is used to deserialize pickled objects on the Python side - @param path: - @param keyClass: - @param valueClass: - @param keyWrapper: - @param valueWrapper: - @param minSplits: - >>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfint/").collect()) - [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] - >>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfdouble/").collect()) - [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')] - >>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sftext/").collect()) - [(u'1', u'aa'), (u'1', u'aa'), (u'2', u'aa'), (u'2', u'bb'), (u'2', u'bb'), (u'3', u'cc')] - >>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfbool/").collect()) - [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)] - >>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfnull/").collect()) - [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)] - >>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfmap/").collect()) - [(1, {2.0: u'aa'}), (1, {3.0: u'bb'}), (2, {1.0: u'aa'}), (2, {1.0: u'cc'}), (2, {3.0: u'bb'}), (3, {2.0: u'dd'})] - >>> r = sorted(sc.sequenceFile(tempdir + "/sftestdata/sfclass").collect())[0] - >>> r == (u'1', {u'__class__': u'org.apache.spark.api.python.TestWritable', u'double': 54.0, u'int': 123, u'str': u'test1'}) - True + @param path: path to sequncefile + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: + @param valueConverter: + @param minSplits: minimum splits in dataset + (default min(2, sc.defaultParallelism)) """ minSplits = minSplits or min(self.defaultParallelism, 2) jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass, @@ -383,6 +372,18 @@ def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConv A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java + + @param path: path to Hadoop file + @param inputFormatClass: fully qualified classname of Hadoop InputFormat + (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat") + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: (None by default) + @param valueConverter: (None by default) + @param conf: Hadoop configuration, passed in as a dict + (None by default) """ jconf = self._dictToJavaMap(conf) jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass, @@ -393,9 +394,20 @@ def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=N valueConverter=None, conf=None): """ Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary - Hadoop configuration, - which is passed in as a Python dict. This will be converted into a Configuration in Java. + Hadoop configuration, which is passed in as a Python dict. + This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile. + + @param inputFormatClass: fully qualified classname of Hadoop InputFormat + (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat") + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: (None by default) + @param valueConverter: (None by default) + @param conf: Hadoop configuration, passed in as a dict + (None by default) """ jconf = self._dictToJavaMap(conf) jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass, @@ -410,11 +422,21 @@ def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter= The mechanism is the same as for sc.sequenceFile. A Hadoop configuration can be passed in as a Python dict. This will be converted into a - Configuration in Java + Configuration in Java. + + @param path: path to Hadoop file + @param inputFormatClass: fully qualified classname of Hadoop InputFormat + (e.g. "org.apache.hadoop.mapred.TextInputFormat") + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: (None by default) + @param valueConverter: (None by default) + @param conf: Hadoop configuration, passed in as a dict + (None by default) """ jconf = self._dictToJavaMap(conf) - for k, v in conf.iteritems(): - jconf[k] = v jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf) return RDD(jrdd, self, PickleSerializer()) @@ -423,9 +445,20 @@ def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, valueConverter=None, conf=None): """ Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary - Hadoop configuration, - which is passed in as a Python dict. This will be converted into a Configuration in Java. + Hadoop configuration, which is passed in as a Python dict. + This will be converted into a Configuration in Java. The mechanism is the same as for sc.sequenceFile. + + @param inputFormatClass: fully qualified classname of Hadoop InputFormat + (e.g. "org.apache.hadoop.mapred.TextInputFormat") + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: (None by default) + @param valueConverter: (None by default) + @param conf: Hadoop configuration, passed in as a dict + (None by default) """ jconf = self._dictToJavaMap(conf) jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass, @@ -674,7 +707,6 @@ def _test(): globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) globs['tempdir'] = tempfile.mkdtemp() - globs['sc']._jvm.WriteInputFormatTestDataGenerator.generateData(globs['tempdir'], globs['sc']._jsc) atexit.register(lambda: shutil.rmtree(globs['tempdir'])) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index ed90915fcda35..6b5cd96df597c 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -202,6 +202,140 @@ def func(x): self.sc.parallelize([1]).foreach(func) +class TestInputFormat(PySparkTestCase): + + def setUp(self): + PySparkTestCase.setUp(self) + self.tempdir = tempfile.NamedTemporaryFile(delete=False) + os.unlink(self.tempdir.name) + self.sc._jvm.WriteInputFormatTestDataGenerator.generateData(self.tempdir.name, self.sc._jsc) + + def tearDown(self): + PySparkTestCase.tearDown(self) + shutil.rmtree(self.tempdir.name) + + def test_sequencefiles(self): + basepath = self.tempdir.name + ints = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfint/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text").collect()) + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + self.assertEqual(ints, ei) + + doubles = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfdouble/", + "org.apache.hadoop.io.DoubleWritable", + "org.apache.hadoop.io.Text").collect()) + ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')] + self.assertEqual(doubles, ed) + + text = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sftext/", + "org.apache.hadoop.io.Text", + "org.apache.hadoop.io.Text").collect()) + et = [(u'1', u'aa'), + (u'1', u'aa'), + (u'2', u'aa'), + (u'2', u'bb'), + (u'2', u'bb'), + (u'3', u'cc')] + self.assertEqual(text, et) + + bools = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbool/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.BooleanWritable").collect()) + eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)] + self.assertEqual(bools, eb) + + nulls = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfnull/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.BooleanWritable").collect()) + en = [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)] + self.assertEqual(nulls, en) + + maps = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfmap/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.MapWritable").collect()) + em = [(1, {2.0: u'aa'}), + (1, {3.0: u'bb'}), + (2, {1.0: u'aa'}), + (2, {1.0: u'cc'}), + (2, {3.0: u'bb'}), + (3, {2.0: u'dd'})] + self.assertEqual(maps, em) + + clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/", + "org.apache.hadoop.io.Text", + "org.apache.spark.api.python.TestWritable").collect()) + ec = (u'1', + {u'__class__': u'org.apache.spark.api.python.TestWritable', + u'double': 54.0, u'int': 123, u'str': u'test1'}) + self.assertEqual(clazz[0], ec) + + def test_oldhadoop(self): + basepath = self.tempdir.name + ints = sorted(self.sc.hadoopFile(basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapred.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text").collect()) + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + self.assertEqual(ints, ei) + + hello = self.sc.hadoopFile("python/test_support/hello.txt", + "org.apache.hadoop.mapred.TextInputFormat", + "org.apache.hadoop.io.LongWritable", + "org.apache.hadoop.io.Text").collect() + result = [(0, u'Hello World!')] + self.assertEqual(hello, result) + + def test_newhadoop(self): + basepath = self.tempdir.name + ints = sorted(self.sc.newAPIHadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text").collect()) + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + self.assertEqual(ints, ei) + + hello = self.sc.newAPIHadoopFile("python/test_support/hello.txt", + "org.apache.hadoop.mapreduce.lib.input.TextInputFormat", + "org.apache.hadoop.io.LongWritable", + "org.apache.hadoop.io.Text").collect() + result = [(0, u'Hello World!')] + self.assertEqual(hello, result) + + def test_newolderror(self): + basepath = self.tempdir.name + newFromOld = self.sc.hadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text") + self.assertRaises(Exception, lambda: newFromOld.collect()) + + oldFromNew = self.sc.newAPIHadoopFile(basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapred.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text") + self.assertRaises(Exception, lambda: oldFromNew.collect()) + + def test_bad_inputs(self): + basepath = self.tempdir.name + self.assertRaises(Exception, lambda: self.sc.sequenceFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.io.NotValidWritable", + "org.apache.hadoop.io.Text")) + self.assertRaises(Exception, lambda: self.sc.hadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapred.NotValidInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text")) + self.assertRaises(Exception, lambda: self.sc.newAPIHadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapreduce.lib.input.NotValidInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text")) + + class TestDaemon(unittest.TestCase): def connect(self, port): from socket import socket, AF_INET, SOCK_STREAM