diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala new file mode 100644 index 0000000000000..adaa1ef6cf9ff --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import org.apache.spark.rdd.RDD +import org.apache.spark.Logging +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.io._ +import scala.util.{Failure, Success, Try} +import org.apache.spark.annotation.Experimental + + +/** + * :: Experimental :: + * A trait for use with reading custom classes in PySpark. Implement this trait and add custom + * transformation code by overriding the convert method. + */ +@Experimental +trait Converter[T, U] extends Serializable { + def convert(obj: T): U +} + +private[python] object Converter extends Logging { + + def getInstance(converterClass: Option[String]): Converter[Any, Any] = { + converterClass.map { cc => + Try { + val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]] + logInfo(s"Loaded converter: $cc") + c + } match { + case Success(c) => c + case Failure(err) => + logError(s"Failed to load converter: $cc") + throw err + } + }.getOrElse { new DefaultConverter } + } +} + +/** + * A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects. + * Other objects are passed through without conversion. + */ +private[python] class DefaultConverter extends Converter[Any, Any] { + + /** + * Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or + * object representation + */ + private def convertWritable(writable: Writable): Any = { + import collection.JavaConversions._ + writable match { + case iw: IntWritable => iw.get() + case dw: DoubleWritable => dw.get() + case lw: LongWritable => lw.get() + case fw: FloatWritable => fw.get() + case t: Text => t.toString + case bw: BooleanWritable => bw.get() + case byw: BytesWritable => byw.getBytes + case n: NullWritable => null + case aw: ArrayWritable => aw.get().map(convertWritable(_)) + case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) => + (convertWritable(k), convertWritable(v)) + }.toMap) + case other => other + } + } + + def convert(obj: Any): Any = { + obj match { + case writable: Writable => + convertWritable(writable) + case _ => + obj + } + } +} + +/** Utilities for working with Python objects <-> Hadoop-related objects */ +private[python] object PythonHadoopUtil { + + /** + * Convert a [[java.util.Map]] of properties to a [[org.apache.hadoop.conf.Configuration]] + */ + def mapToConf(map: java.util.Map[String, String]): Configuration = { + import collection.JavaConversions._ + val conf = new Configuration() + map.foreach{ case (k, v) => conf.set(k, v) } + conf + } + + /** + * Merges two configurations, returns a copy of left with keys from right overwriting + * any matching keys in left + */ + def mergeConfs(left: Configuration, right: Configuration): Configuration = { + import collection.JavaConversions._ + val copy = new Configuration(left) + right.iterator().foreach(entry => copy.set(entry.getKey, entry.getValue)) + copy + } + + /** + * Converts an RDD of key-value pairs, where key and/or value could be instances of + * [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)] + */ + def convertRDD[K, V](rdd: RDD[(K, V)], + keyConverter: Converter[Any, Any], + valueConverter: Converter[Any, Any]): RDD[(Any, Any)] = { + rdd.map { case (k, v) => (keyConverter.convert(k), valueConverter.convert(v)) } + } + +} diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index d1df99300c5b1..f6570d335757a 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -28,6 +28,9 @@ import scala.util.Try import net.razorvine.pickle.{Pickler, Unpickler} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.{InputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.spark._ import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} import org.apache.spark.broadcast.Broadcast @@ -266,7 +269,7 @@ private object SpecialLengths { val TIMING_DATA = -3 } -private[spark] object PythonRDD { +private[spark] object PythonRDD extends Logging { val UTF8 = Charset.forName("UTF-8") /** @@ -346,6 +349,180 @@ private[spark] object PythonRDD { } } + /** + * Create an RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]], + * key and value class. + * A key and/or value converter class can optionally be passed in + * (see [[org.apache.spark.api.python.Converter]]) + */ + def sequenceFile[K, V]( + sc: JavaSparkContext, + path: String, + keyClassMaybeNull: String, + valueClassMaybeNull: String, + keyConverterClass: String, + valueConverterClass: String, + minSplits: Int) = { + val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text") + val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text") + implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]] + implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]] + val kc = kcm.runtimeClass.asInstanceOf[Class[K]] + val vc = vcm.runtimeClass.asInstanceOf[Class[V]] + + val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits) + val keyConverter = Converter.getInstance(Option(keyConverterClass)) + val valueConverter = Converter.getInstance(Option(valueConverterClass)) + val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter) + JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) + } + + /** + * Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]], + * key and value class. + * A key and/or value converter class can optionally be passed in + * (see [[org.apache.spark.api.python.Converter]]) + */ + def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( + sc: JavaSparkContext, + path: String, + inputFormatClass: String, + keyClass: String, + valueClass: String, + keyConverterClass: String, + valueConverterClass: String, + confAsMap: java.util.HashMap[String, String]) = { + val conf = PythonHadoopUtil.mapToConf(confAsMap) + val baseConf = sc.hadoopConfiguration() + val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf) + val rdd = + newAPIHadoopRDDFromClassNames[K, V, F](sc, + Some(path), inputFormatClass, keyClass, valueClass, mergedConf) + val keyConverter = Converter.getInstance(Option(keyConverterClass)) + val valueConverter = Converter.getInstance(Option(valueConverterClass)) + val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter) + JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) + } + + /** + * Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is + * passed in from Python, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]], + * key and value class. + * A key and/or value converter class can optionally be passed in + * (see [[org.apache.spark.api.python.Converter]]) + */ + def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( + sc: JavaSparkContext, + inputFormatClass: String, + keyClass: String, + valueClass: String, + keyConverterClass: String, + valueConverterClass: String, + confAsMap: java.util.HashMap[String, String]) = { + val conf = PythonHadoopUtil.mapToConf(confAsMap) + val rdd = + newAPIHadoopRDDFromClassNames[K, V, F](sc, + None, inputFormatClass, keyClass, valueClass, conf) + val keyConverter = Converter.getInstance(Option(keyConverterClass)) + val valueConverter = Converter.getInstance(Option(valueConverterClass)) + val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter) + JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) + } + + private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]]( + sc: JavaSparkContext, + path: Option[String] = None, + inputFormatClass: String, + keyClass: String, + valueClass: String, + conf: Configuration) = { + implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]] + implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]] + implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]] + val kc = kcm.runtimeClass.asInstanceOf[Class[K]] + val vc = vcm.runtimeClass.asInstanceOf[Class[V]] + val fc = fcm.runtimeClass.asInstanceOf[Class[F]] + val rdd = if (path.isDefined) { + sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf) + } else { + sc.sc.newAPIHadoopRDD[K, V, F](conf, fc, kc, vc) + } + rdd + } + + /** + * Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]], + * key and value class. + * A key and/or value converter class can optionally be passed in + * (see [[org.apache.spark.api.python.Converter]]) + */ + def hadoopFile[K, V, F <: InputFormat[K, V]]( + sc: JavaSparkContext, + path: String, + inputFormatClass: String, + keyClass: String, + valueClass: String, + keyConverterClass: String, + valueConverterClass: String, + confAsMap: java.util.HashMap[String, String]) = { + val conf = PythonHadoopUtil.mapToConf(confAsMap) + val baseConf = sc.hadoopConfiguration() + val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf) + val rdd = + hadoopRDDFromClassNames[K, V, F](sc, + Some(path), inputFormatClass, keyClass, valueClass, mergedConf) + val keyConverter = Converter.getInstance(Option(keyConverterClass)) + val valueConverter = Converter.getInstance(Option(valueConverterClass)) + val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter) + JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) + } + + /** + * Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map + * that is passed in from Python, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]], + * key and value class + * A key and/or value converter class can optionally be passed in + * (see [[org.apache.spark.api.python.Converter]]) + */ + def hadoopRDD[K, V, F <: InputFormat[K, V]]( + sc: JavaSparkContext, + inputFormatClass: String, + keyClass: String, + valueClass: String, + keyConverterClass: String, + valueConverterClass: String, + confAsMap: java.util.HashMap[String, String]) = { + val conf = PythonHadoopUtil.mapToConf(confAsMap) + val rdd = + hadoopRDDFromClassNames[K, V, F](sc, + None, inputFormatClass, keyClass, valueClass, conf) + val keyConverter = Converter.getInstance(Option(keyConverterClass)) + val valueConverter = Converter.getInstance(Option(valueConverterClass)) + val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter) + JavaRDD.fromRDD(SerDeUtil.rddToPython(converted)) + } + + private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]]( + sc: JavaSparkContext, + path: Option[String] = None, + inputFormatClass: String, + keyClass: String, + valueClass: String, + conf: Configuration) = { + implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]] + implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]] + implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]] + val kc = kcm.runtimeClass.asInstanceOf[Class[K]] + val vc = vcm.runtimeClass.asInstanceOf[Class[V]] + val fc = fcm.runtimeClass.asInstanceOf[Class[F]] + val rdd = if (path.isDefined) { + sc.sc.hadoopFile(path.get, fc, kc, vc) + } else { + sc.sc.hadoopRDD(new JobConf(conf), fc, kc, vc) + } + rdd + } + def writeUTF(str: String, dataOut: DataOutputStream) { val bytes = str.getBytes(UTF8) dataOut.writeInt(bytes.length) diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala new file mode 100644 index 0000000000000..9a012e7254901 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import scala.util.Try +import org.apache.spark.rdd.RDD +import org.apache.spark.Logging +import scala.util.Success +import scala.util.Failure +import net.razorvine.pickle.Pickler + + +/** Utilities for serialization / deserialization between Python and Java, using Pickle. */ +private[python] object SerDeUtil extends Logging { + + private def checkPickle(t: (Any, Any)): (Boolean, Boolean) = { + val pickle = new Pickler + val kt = Try { + pickle.dumps(t._1) + } + val vt = Try { + pickle.dumps(t._2) + } + (kt, vt) match { + case (Failure(kf), Failure(vf)) => + logWarning(s""" + |Failed to pickle Java object as key: ${t._1.getClass.getSimpleName}, falling back + |to 'toString'. Error: ${kf.getMessage}""".stripMargin) + logWarning(s""" + |Failed to pickle Java object as value: ${t._2.getClass.getSimpleName}, falling back + |to 'toString'. Error: ${vf.getMessage}""".stripMargin) + (true, true) + case (Failure(kf), _) => + logWarning(s""" + |Failed to pickle Java object as key: ${t._1.getClass.getSimpleName}, falling back + |to 'toString'. Error: ${kf.getMessage}""".stripMargin) + (true, false) + case (_, Failure(vf)) => + logWarning(s""" + |Failed to pickle Java object as value: ${t._2.getClass.getSimpleName}, falling back + |to 'toString'. Error: ${vf.getMessage}""".stripMargin) + (false, true) + case _ => + (false, false) + } + } + + /** + * Convert an RDD of key-value pairs to an RDD of serialized Python objects, that is usable + * by PySpark. By default, if serialization fails, toString is called and the string + * representation is serialized + */ + def rddToPython(rdd: RDD[(Any, Any)]): RDD[Array[Byte]] = { + val (keyFailed, valueFailed) = checkPickle(rdd.first()) + rdd.mapPartitions { iter => + val pickle = new Pickler + iter.map { case (k, v) => + if (keyFailed && valueFailed) { + pickle.dumps(Array(k.toString, v.toString)) + } else if (keyFailed) { + pickle.dumps(Array(k.toString, v)) + } else if (!keyFailed && valueFailed) { + pickle.dumps(Array(k, v.toString)) + } else { + pickle.dumps(Array(k, v)) + } + } + } + } + +} + diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala new file mode 100644 index 0000000000000..f0e3fb9aff5a0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import org.apache.spark.SparkContext +import org.apache.hadoop.io._ +import scala.Array +import java.io.{DataOutput, DataInput} +import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat +import org.apache.spark.api.java.JavaSparkContext + +/** + * A class to test MsgPack serialization on the Scala side, that will be deserialized + * in Python + * @param str + * @param int + * @param double + */ +case class TestWritable(var str: String, var int: Int, var double: Double) extends Writable { + def this() = this("", 0, 0.0) + + def getStr = str + def setStr(str: String) { this.str = str } + def getInt = int + def setInt(int: Int) { this.int = int } + def getDouble = double + def setDouble(double: Double) { this.double = double } + + def write(out: DataOutput) = { + out.writeUTF(str) + out.writeInt(int) + out.writeDouble(double) + } + + def readFields(in: DataInput) = { + str = in.readUTF() + int = in.readInt() + double = in.readDouble() + } +} + +class TestConverter extends Converter[Any, Any] { + import collection.JavaConversions._ + override def convert(obj: Any) = { + val m = obj.asInstanceOf[MapWritable] + seqAsJavaList(m.keySet.map(w => w.asInstanceOf[DoubleWritable].get()).toSeq) + } +} + +/** + * This object contains method to generate SequenceFile test data and write it to a + * given directory (probably a temp directory) + */ +object WriteInputFormatTestDataGenerator { + import SparkContext._ + + def main(args: Array[String]) { + val path = args(0) + val sc = new JavaSparkContext("local[4]", "test-writables") + generateData(path, sc) + } + + def generateData(path: String, jsc: JavaSparkContext) { + val sc = jsc.sc + + val basePath = s"$path/sftestdata/" + val textPath = s"$basePath/sftext/" + val intPath = s"$basePath/sfint/" + val doublePath = s"$basePath/sfdouble/" + val arrPath = s"$basePath/sfarray/" + val mapPath = s"$basePath/sfmap/" + val classPath = s"$basePath/sfclass/" + val bytesPath = s"$basePath/sfbytes/" + val boolPath = s"$basePath/sfbool/" + val nullPath = s"$basePath/sfnull/" + + /* + * Create test data for IntWritable, DoubleWritable, Text, BytesWritable, + * BooleanWritable and NullWritable + */ + val intKeys = Seq((1, "aa"), (2, "bb"), (2, "aa"), (3, "cc"), (2, "bb"), (1, "aa")) + sc.parallelize(intKeys).saveAsSequenceFile(intPath) + sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath) + sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath) + sc.parallelize(intKeys.map{ case (k, v) => (k, v.getBytes) }).saveAsSequenceFile(bytesPath) + val bools = Seq((1, true), (2, true), (2, false), (3, true), (2, false), (1, false)) + sc.parallelize(bools).saveAsSequenceFile(boolPath) + sc.parallelize(intKeys).map{ case (k, v) => + (new IntWritable(k), NullWritable.get()) + }.saveAsSequenceFile(nullPath) + + // Create test data for ArrayWritable + val data = Seq( + (1, Array(1.0, 2.0, 3.0)), + (2, Array(3.0, 4.0, 5.0)), + (3, Array(4.0, 5.0, 6.0)) + ) + sc.parallelize(data, numSlices = 2) + .map{ case (k, v) => + (new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_)))) + }.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath) + + // Create test data for MapWritable, with keys DoubleWritable and values Text + val mapData = Seq( + (1, Map(2.0 -> "aa")), + (2, Map(3.0 -> "bb")), + (2, Map(1.0 -> "cc")), + (3, Map(2.0 -> "dd")), + (2, Map(1.0 -> "aa")), + (1, Map(3.0 -> "bb")) + ) + sc.parallelize(mapData, numSlices = 2).map{ case (i, m) => + val mw = new MapWritable() + val k = m.keys.head + val v = m.values.head + mw.put(new DoubleWritable(k), new Text(v)) + (new IntWritable(i), mw) + }.saveAsSequenceFile(mapPath) + + // Create test data for arbitrary custom writable TestWritable + val testClass = Seq( + ("1", TestWritable("test1", 123, 54.0)), + ("2", TestWritable("test2", 456, 8762.3)), + ("1", TestWritable("test3", 123, 423.1)), + ("3", TestWritable("test56", 456, 423.5)), + ("2", TestWritable("test2", 123, 5435.2)) + ) + val rdd = sc.parallelize(testClass, numSlices = 2).map{ case (k, v) => (new Text(k), v) } + rdd.saveAsNewAPIHadoopFile(classPath, + classOf[Text], classOf[TestWritable], + classOf[SequenceFileOutputFormat[Text, TestWritable]]) + } + + +} diff --git a/docs/programming-guide.md b/docs/programming-guide.md index 7d77e640d0e4b..7989e02dfb732 100644 --- a/docs/programming-guide.md +++ b/docs/programming-guide.md @@ -359,8 +359,7 @@ Apart from text files, Spark's Java API also supports several other data formats
-PySpark can create distributed datasets from any file system supported by Hadoop, including your local file system, HDFS, KFS, [Amazon S3](http://wiki.apache.org/hadoop/AmazonS3), etc. -The current API is limited to text files, but support for binary Hadoop InputFormats is expected in future versions. +PySpark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, [Amazon S3](http://wiki.apache.org/hadoop/AmazonS3), etc. Spark supports text files, [SequenceFiles](http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/mapred/SequenceFileInputFormat.html), and any other Hadoop [InputFormat](http://hadoop.apache.org/docs/stable/api/org/apache/hadoop/mapred/InputFormat.html). Text file RDDs can be created using `SparkContext`'s `textFile` method. This method takes an URI for the file (either a local path on the machine, or a `hdfs://`, `s3n://`, etc URI) and reads it as a collection of lines. Here is an example invocation: @@ -378,11 +377,90 @@ Some notes on reading files with Spark: * The `textFile` method also takes an optional second argument for controlling the number of slices of the file. By default, Spark creates one slice for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of slices by passing a larger value. Note that you cannot have fewer slices than blocks. -Apart reading files as a collection of lines, +Apart from reading files as a collection of lines, `SparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with `textFile`, which would return one record per line in each file. -
+### SequenceFile and Hadoop InputFormats + +In addition to reading text files, PySpark supports reading ```SequenceFile``` +and any arbitrary ```InputFormat```. + +**Note** this feature is currently marked ```Experimental``` and is intended for advanced users. It may be replaced in future with read/write support based on SparkSQL, in which case SparkSQL is the preferred approach. + +#### Writable Support + +PySpark SequenceFile support loads an RDD within Java, and pickles the resulting Java objects using +[Pyrolite](https://github.com/irmen/Pyrolite/). The following Writables are automatically converted: + + + + + + + + + + + + + + +
Writable TypePython Type
Textunicode str
IntWritableint
FloatWritablefloat
DoubleWritablefloat
BooleanWritablebool
BytesWritablebytearray
NullWritableNone
ArrayWritablelist of primitives, or tuple of objects
MapWritabledict
Custom Class conforming to Java Bean conventionsdict of public properties (via JavaBean getters and setters) + __class__ for the class type
+ +#### Loading SequenceFiles +Similarly to text files, SequenceFiles can be loaded by specifying the path. The key and value +classes can be specified, but for standard Writables this is not required. + +{% highlight python %} +>>> rdd = sc.sequenceFile("path/to/sequencefile/of/doubles") +>>> rdd.collect() # this example has DoubleWritable keys and Text values +[(1.0, u'aa'), + (2.0, u'bb'), + (2.0, u'aa'), + (3.0, u'cc'), + (2.0, u'bb'), + (1.0, u'aa')] +{% endhighlight %} + +#### Loading Other Hadoop InputFormats + +PySpark can also read any Hadoop InputFormat, for both 'new' and 'old' Hadoop APIs. If required, +a Hadoop configuration can be passed in as a Python dict. Here is an example using the +Elasticsearch ESInputFormat: + +{% highlight python %} +$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark +>>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults +>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\ + "org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf) +>>> rdd.first() # the result is a MapWritable that is converted to a Python dict +(u'Elasticsearch ID', + {u'field1': True, + u'field2': u'Some Text', + u'field3': 12345}) +{% endhighlight %} + +Note that, if the InputFormat simply depends on a Hadoop configuration and/or input path, and +the key and value classes can easily be converted according to the above table, +then this approach should work well for such cases. + +If you have custom serialized binary data (such as loading data from Cassandra / HBase) or custom +classes that don't conform to the JavaBean requirements, then you will first need to +transform that data on the Scala/Java side to something which can be handled by Pyrolite's pickler. +A [Converter](api/scala/index.html#org.apache.spark.api.python.Converter) trait is provided +for this. Simply extend this trait and implement your transformation code in the ```convert``` +method. Remember to ensure that this class, along with any dependencies required to access your ```InputFormat```, are packaged into your Spark job jar and included on the PySpark +classpath. + +See the [Python examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/python) and +the [Converter examples]({{site.SPARK_GITHUB_URL}}/tree/master/examples/src/main/scala/pythonconverters) +for examples of using HBase and Cassandra ```InputFormat```. + +Future support for writing data out as ```SequenceFileOutputFormat``` and other ```OutputFormats```, +is forthcoming. + + diff --git a/examples/src/main/python/cassandra_inputformat.py b/examples/src/main/python/cassandra_inputformat.py new file mode 100644 index 0000000000000..39fa6b0d22ef5 --- /dev/null +++ b/examples/src/main/python/cassandra_inputformat.py @@ -0,0 +1,79 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys + +from pyspark import SparkContext + +""" +Create data in Cassandra fist +(following: https://wiki.apache.org/cassandra/GettingStarted) + +cqlsh> CREATE KEYSPACE test + ... WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; +cqlsh> use test; +cqlsh:test> CREATE TABLE users ( + ... user_id int PRIMARY KEY, + ... fname text, + ... lname text + ... ); +cqlsh:test> INSERT INTO users (user_id, fname, lname) + ... VALUES (1745, 'john', 'smith'); +cqlsh:test> INSERT INTO users (user_id, fname, lname) + ... VALUES (1744, 'john', 'doe'); +cqlsh:test> INSERT INTO users (user_id, fname, lname) + ... VALUES (1746, 'john', 'smith'); +cqlsh:test> SELECT * FROM users; + + user_id | fname | lname +---------+-------+------- + 1745 | john | smith + 1744 | john | doe + 1746 | john | smith +""" +if __name__ == "__main__": + if len(sys.argv) != 4: + print >> sys.stderr, """ + Usage: cassandra_inputformat + + Run with example jar: + ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_inputformat.py + Assumes you have some data in Cassandra already, running on , in and + """ + exit(-1) + + host = sys.argv[1] + keyspace = sys.argv[2] + cf = sys.argv[3] + sc = SparkContext(appName="CassandraInputFormat") + + conf = {"cassandra.input.thrift.address":host, + "cassandra.input.thrift.port":"9160", + "cassandra.input.keyspace":keyspace, + "cassandra.input.columnfamily":cf, + "cassandra.input.partitioner.class":"Murmur3Partitioner", + "cassandra.input.page.row.size":"3"} + cass_rdd = sc.newAPIHadoopRDD( + "org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat", + "java.util.Map", + "java.util.Map", + keyConverter="org.apache.spark.examples.pythonconverters.CassandraCQLKeyConverter", + valueConverter="org.apache.spark.examples.pythonconverters.CassandraCQLValueConverter", + conf=conf) + output = cass_rdd.collect() + for (k, v) in output: + print (k, v) diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py new file mode 100644 index 0000000000000..3289d9880a0f5 --- /dev/null +++ b/examples/src/main/python/hbase_inputformat.py @@ -0,0 +1,72 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys + +from pyspark import SparkContext + +""" +Create test data in HBase first: + +hbase(main):016:0> create 'test', 'f1' +0 row(s) in 1.0430 seconds + +hbase(main):017:0> put 'test', 'row1', 'f1', 'value1' +0 row(s) in 0.0130 seconds + +hbase(main):018:0> put 'test', 'row2', 'f1', 'value2' +0 row(s) in 0.0030 seconds + +hbase(main):019:0> put 'test', 'row3', 'f1', 'value3' +0 row(s) in 0.0050 seconds + +hbase(main):020:0> put 'test', 'row4', 'f1', 'value4' +0 row(s) in 0.0110 seconds + +hbase(main):021:0> scan 'test' +ROW COLUMN+CELL + row1 column=f1:, timestamp=1401883411986, value=value1 + row2 column=f1:, timestamp=1401883415212, value=value2 + row3 column=f1:, timestamp=1401883417858, value=value3 + row4 column=f1:, timestamp=1401883420805, value=value4 +4 row(s) in 0.0240 seconds +""" +if __name__ == "__main__": + if len(sys.argv) != 3: + print >> sys.stderr, """ + Usage: hbase_inputformat + + Run with example jar: + ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_inputformat.py
+ Assumes you have some data in HBase already, running on , in
+ """ + exit(-1) + + host = sys.argv[1] + table = sys.argv[2] + sc = SparkContext(appName="HBaseInputFormat") + + conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table} + hbase_rdd = sc.newAPIHadoopRDD( + "org.apache.hadoop.hbase.mapreduce.TableInputFormat", + "org.apache.hadoop.hbase.io.ImmutableBytesWritable", + "org.apache.hadoop.hbase.client.Result", + valueConverter="org.apache.spark.examples.pythonconverters.HBaseConverter", + conf=conf) + output = hbase_rdd.collect() + for (k, v) in output: + print (k, v) diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala index 9a00701f985f0..71f53af68f4d3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala @@ -33,6 +33,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ + /* Need to create following keyspace and column family in cassandra before running this example Start CQL shell using ./bin/cqlsh and execute following commands diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala index a8c338480e6e2..4893b017ed819 100644 --- a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala @@ -22,7 +22,7 @@ import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark._ -import org.apache.spark.rdd.NewHadoopRDD + object HBaseTest { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala new file mode 100644 index 0000000000000..29a65c7a5f295 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.pythonconverters + +import org.apache.spark.api.python.Converter +import java.nio.ByteBuffer +import org.apache.cassandra.utils.ByteBufferUtil +import collection.JavaConversions.{mapAsJavaMap, mapAsScalaMap} + + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra + * output to a Map[String, Int] + */ +class CassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, Int]] { + override def convert(obj: Any): java.util.Map[String, Int] = { + val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]] + mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb))) + } +} + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra + * output to a Map[String, String] + */ +class CassandraCQLValueConverter extends Converter[Any, java.util.Map[String, String]] { + override def convert(obj: Any): java.util.Map[String, String] = { + val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]] + mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb))) + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala new file mode 100644 index 0000000000000..42ae960bd64a1 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.pythonconverters + +import org.apache.spark.api.python.Converter +import org.apache.hadoop.hbase.client.Result +import org.apache.hadoop.hbase.util.Bytes + +/** + * Implementation of [[org.apache.spark.api.python.Converter]] that converts a HBase Result + * to a String + */ +class HBaseConverter extends Converter[Any, String] { + override def convert(obj: Any): String = { + val result = obj.asInstanceOf[Result] + Bytes.toStringBinary(result.value()) + } +} diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 211918f5a05ec..062bec2381a8f 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -342,6 +342,143 @@ def wholeTextFiles(self, path, minPartitions=None): return RDD(self._jsc.wholeTextFiles(path, minPartitions), self, PairDeserializer(UTF8Deserializer(), UTF8Deserializer())) + def _dictToJavaMap(self, d): + jm = self._jvm.java.util.HashMap() + if not d: + d = {} + for k, v in d.iteritems(): + jm[k] = v + return jm + + def sequenceFile(self, path, keyClass=None, valueClass=None, keyConverter=None, + valueConverter=None, minSplits=None): + """ + Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS, + a local file system (available on all nodes), or any Hadoop-supported file system URI. + The mechanism is as follows: + 1. A Java RDD is created from the SequenceFile or other InputFormat, and the key + and value Writable classes + 2. Serialization is attempted via Pyrolite pickling + 3. If this fails, the fallback is to call 'toString' on each key and value + 4. C{PickleSerializer} is used to deserialize pickled objects on the Python side + + @param path: path to sequncefile + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: + @param valueConverter: + @param minSplits: minimum splits in dataset + (default min(2, sc.defaultParallelism)) + """ + minSplits = minSplits or min(self.defaultParallelism, 2) + jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, path, keyClass, valueClass, + keyConverter, valueConverter, minSplits) + return RDD(jrdd, self, PickleSerializer()) + + def newAPIHadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, + valueConverter=None, conf=None): + """ + Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, + a local file system (available on all nodes), or any Hadoop-supported file system URI. + The mechanism is the same as for sc.sequenceFile. + + A Hadoop configuration can be passed in as a Python dict. This will be converted into a + Configuration in Java + + @param path: path to Hadoop file + @param inputFormatClass: fully qualified classname of Hadoop InputFormat + (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat") + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: (None by default) + @param valueConverter: (None by default) + @param conf: Hadoop configuration, passed in as a dict + (None by default) + """ + jconf = self._dictToJavaMap(conf) + jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, path, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, jconf) + return RDD(jrdd, self, PickleSerializer()) + + def newAPIHadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, + valueConverter=None, conf=None): + """ + Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary + Hadoop configuration, which is passed in as a Python dict. + This will be converted into a Configuration in Java. + The mechanism is the same as for sc.sequenceFile. + + @param inputFormatClass: fully qualified classname of Hadoop InputFormat + (e.g. "org.apache.hadoop.mapreduce.lib.input.TextInputFormat") + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: (None by default) + @param valueConverter: (None by default) + @param conf: Hadoop configuration, passed in as a dict + (None by default) + """ + jconf = self._dictToJavaMap(conf) + jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, jconf) + return RDD(jrdd, self, PickleSerializer()) + + def hadoopFile(self, path, inputFormatClass, keyClass, valueClass, keyConverter=None, + valueConverter=None, conf=None): + """ + Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, + a local file system (available on all nodes), or any Hadoop-supported file system URI. + The mechanism is the same as for sc.sequenceFile. + + A Hadoop configuration can be passed in as a Python dict. This will be converted into a + Configuration in Java. + + @param path: path to Hadoop file + @param inputFormatClass: fully qualified classname of Hadoop InputFormat + (e.g. "org.apache.hadoop.mapred.TextInputFormat") + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: (None by default) + @param valueConverter: (None by default) + @param conf: Hadoop configuration, passed in as a dict + (None by default) + """ + jconf = self._dictToJavaMap(conf) + jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, path, inputFormatClass, keyClass, + valueClass, keyConverter, valueConverter, jconf) + return RDD(jrdd, self, PickleSerializer()) + + def hadoopRDD(self, inputFormatClass, keyClass, valueClass, keyConverter=None, + valueConverter=None, conf=None): + """ + Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary + Hadoop configuration, which is passed in as a Python dict. + This will be converted into a Configuration in Java. + The mechanism is the same as for sc.sequenceFile. + + @param inputFormatClass: fully qualified classname of Hadoop InputFormat + (e.g. "org.apache.hadoop.mapred.TextInputFormat") + @param keyClass: fully qualified classname of key Writable class + (e.g. "org.apache.hadoop.io.Text") + @param valueClass: fully qualified classname of value Writable class + (e.g. "org.apache.hadoop.io.LongWritable") + @param keyConverter: (None by default) + @param valueConverter: (None by default) + @param conf: Hadoop configuration, passed in as a dict + (None by default) + """ + jconf = self._dictToJavaMap(conf) + jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, keyClass, valueClass, + keyConverter, valueConverter, jconf) + return RDD(jrdd, self, PickleSerializer()) + def _checkpointFile(self, name, input_deserializer): jrdd = self._jsc.checkpointFile(name) return RDD(jrdd, self, input_deserializer) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 1f2a6ea941cf2..184ee810b861b 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -198,6 +198,151 @@ def func(x): self.sc.parallelize([1]).foreach(func) +class TestInputFormat(PySparkTestCase): + + def setUp(self): + PySparkTestCase.setUp(self) + self.tempdir = tempfile.NamedTemporaryFile(delete=False) + os.unlink(self.tempdir.name) + self.sc._jvm.WriteInputFormatTestDataGenerator.generateData(self.tempdir.name, self.sc._jsc) + + def tearDown(self): + PySparkTestCase.tearDown(self) + shutil.rmtree(self.tempdir.name) + + def test_sequencefiles(self): + basepath = self.tempdir.name + ints = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfint/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text").collect()) + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + self.assertEqual(ints, ei) + + doubles = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfdouble/", + "org.apache.hadoop.io.DoubleWritable", + "org.apache.hadoop.io.Text").collect()) + ed = [(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')] + self.assertEqual(doubles, ed) + + text = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sftext/", + "org.apache.hadoop.io.Text", + "org.apache.hadoop.io.Text").collect()) + et = [(u'1', u'aa'), + (u'1', u'aa'), + (u'2', u'aa'), + (u'2', u'bb'), + (u'2', u'bb'), + (u'3', u'cc')] + self.assertEqual(text, et) + + bools = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfbool/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.BooleanWritable").collect()) + eb = [(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)] + self.assertEqual(bools, eb) + + nulls = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfnull/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.BooleanWritable").collect()) + en = [(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)] + self.assertEqual(nulls, en) + + maps = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfmap/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.MapWritable").collect()) + em = [(1, {2.0: u'aa'}), + (1, {3.0: u'bb'}), + (2, {1.0: u'aa'}), + (2, {1.0: u'cc'}), + (2, {3.0: u'bb'}), + (3, {2.0: u'dd'})] + self.assertEqual(maps, em) + + clazz = sorted(self.sc.sequenceFile(basepath + "/sftestdata/sfclass/", + "org.apache.hadoop.io.Text", + "org.apache.spark.api.python.TestWritable").collect()) + ec = (u'1', + {u'__class__': u'org.apache.spark.api.python.TestWritable', + u'double': 54.0, u'int': 123, u'str': u'test1'}) + self.assertEqual(clazz[0], ec) + + def test_oldhadoop(self): + basepath = self.tempdir.name + ints = sorted(self.sc.hadoopFile(basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapred.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text").collect()) + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + self.assertEqual(ints, ei) + + hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt") + hello = self.sc.hadoopFile(hellopath, + "org.apache.hadoop.mapred.TextInputFormat", + "org.apache.hadoop.io.LongWritable", + "org.apache.hadoop.io.Text").collect() + result = [(0, u'Hello World!')] + self.assertEqual(hello, result) + + def test_newhadoop(self): + basepath = self.tempdir.name + ints = sorted(self.sc.newAPIHadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text").collect()) + ei = [(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')] + self.assertEqual(ints, ei) + + hellopath = os.path.join(SPARK_HOME, "python/test_support/hello.txt") + hello = self.sc.newAPIHadoopFile(hellopath, + "org.apache.hadoop.mapreduce.lib.input.TextInputFormat", + "org.apache.hadoop.io.LongWritable", + "org.apache.hadoop.io.Text").collect() + result = [(0, u'Hello World!')] + self.assertEqual(hello, result) + + def test_newolderror(self): + basepath = self.tempdir.name + self.assertRaises(Exception, lambda: self.sc.hadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text")) + + self.assertRaises(Exception, lambda: self.sc.newAPIHadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapred.SequenceFileInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text")) + + def test_bad_inputs(self): + basepath = self.tempdir.name + self.assertRaises(Exception, lambda: self.sc.sequenceFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.io.NotValidWritable", + "org.apache.hadoop.io.Text")) + self.assertRaises(Exception, lambda: self.sc.hadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapred.NotValidInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text")) + self.assertRaises(Exception, lambda: self.sc.newAPIHadoopFile( + basepath + "/sftestdata/sfint/", + "org.apache.hadoop.mapreduce.lib.input.NotValidInputFormat", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.Text")) + + def test_converter(self): + basepath = self.tempdir.name + maps = sorted(self.sc.sequenceFile( + basepath + "/sftestdata/sfmap/", + "org.apache.hadoop.io.IntWritable", + "org.apache.hadoop.io.MapWritable", + valueConverter="org.apache.spark.api.python.TestConverter").collect()) + em = [(1, [2.0]), (1, [3.0]), (2, [1.0]), (2, [1.0]), (2, [3.0]), (3, [2.0])] + self.assertEqual(maps, em) + + class TestDaemon(unittest.TestCase): def connect(self, port): from socket import socket, AF_INET, SOCK_STREAM