From d86325f394e536c6b2d5cb86d08e35d508fb23d7 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Mon, 9 Dec 2013 08:53:11 +0200 Subject: [PATCH] Initial WIP of PySpark support for SequenceFile and arbitrary Hadoop InputFormat --- .../apache/spark/api/python/PythonRDD.scala | 305 +++++++++++++++++- project/SparkBuild.scala | 3 +- python/pyspark/context.py | 57 +++- python/pyspark/serializers.py | 34 +- 4 files changed, 383 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 132e4fb0d2cad..5363163618232 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -28,7 +28,18 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils - +import org.apache.hadoop.io._ +import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat +import org.apache.spark.api.java.function.PairFunction +import scala.util.{Success, Failure, Try} +import org.msgpack +import org.msgpack.ScalaMessagePack +import org.apache.hadoop.mapreduce.InputFormat + +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import org.apache.hadoop.mapred.SequenceFileOutputFormat +import org.apache.hadoop.conf.Configuration +import java.util private[spark] class PythonRDD[T: ClassManifest]( parent: RDD[T], @@ -185,7 +196,70 @@ private object SpecialLengths { val TIMING_DATA = -3 } -private[spark] object PythonRDD { +case class TestClass(var id: String, var number: Int) { + def this() = this("", 0) +} + +object TestHadoop extends App { + + //PythonRDD.writeToStream((1, "bar"), new DataOutputStream(new FileOutputStream("/tmp/test.out"))) + + + //val n = new NullWritable + + import SparkContext._ + + val path = "/tmp/spark/test/sfarray/" + //val path = "/Users/Nick/workspace/java/faunus/output/job-0/" + + val sc = new SparkContext("local[2]", "test") + + //val rdd = sc.sequenceFile[NullWritable, FaunusVertex](path) + //val data = Seq((1.0, "aa"), (2.0, "bb"), (2.0, "aa"), (3.0, "cc"), (2.0, "bb"), (1.0, "aa")) + val data = Seq( + (1, Array(1.0, 2.0, 3.0)), + (2, Array(3.0, 4.0, 5.0)), + (3, Array(4.0, 5.0, 6.0)) + ) + val d = new DoubleWritable(5.0) + val a = new ArrayWritable(classOf[DoubleWritable], Array(d)) + + val rdd = sc.parallelize(data, numSlices = 2) + //.map({ case (k, v) => (new IntWritable(k), v.map(new DoubleWritable(_))) }) + .map{ case (k, v) => (new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_)))) } + rdd.saveAsNewAPIHadoopFile[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[IntWritable, ArrayWritable]](path) + + /* + val data = Seq( + ("1", TestClass("test1", 123)), + ("2", TestClass("test2", 456)), + ("1", TestClass("test3", 123)), + ("3", TestClass("test56", 456)), + ("2", TestClass("test2", 123)) + ) + val rdd = sc.parallelize(data, numSlices = 2).map{ case (k, v) => (new Text(k), v) } + rdd.saveAsNewAPIHadoopFile(path, + classOf[Text], classOf[TestClass], + classOf[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[Text, TestClass]]) + + //val rdd2 = Seq((1, )) + + val seq = sc.sequenceFile[Double, String](path) + val seqR = seq.collect() + + val packed = PythonRDD.serMsgPack(rdd) + val packedR = packed.collect() + val packed2 = PythonRDD.serMsgPack(seq) + val packedR2 = packed2.collect() + + println(seqR.mkString(",")) + println(packedR.mkString(",")) + println(packedR2.mkString(",")) + */ + +} + +private[spark] object PythonRDD extends Logging { def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int): JavaRDD[Array[Byte]] = { @@ -205,18 +279,233 @@ private[spark] object PythonRDD { JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism)) } - def writeToStream(elem: Any, dataOut: DataOutputStream) { + // PySpark / Hadoop InputFormat stuff + + def register[T](clazz: Class[T], msgpack: ScalaMessagePack) = { + Try { + if (!clazz.isPrimitive) msgpack.register(clazz) + }.getOrElse(log.warn("Failed to register class (%s) with MsgPack. " + + "Falling back to default MsgPack serialization, or 'toString' as last resort".format(clazz.toString))) + } + + // serialize and RDD[(K, V)] -> RDD[Array[Byte]] using MsgPack + def serMsgPack[K, V](rdd: RDD[(K, V)]) = { + import org.msgpack.ScalaMessagePack._ + val msgpack = new ScalaMessagePack with Serializable + val first = rdd.first() + val kc = ClassManifest.fromClass(first._1.getClass).asInstanceOf[ClassManifest[K]].erasure.asInstanceOf[Class[K]] + val vc = ClassManifest.fromClass(first._2.getClass).asInstanceOf[ClassManifest[V]].erasure.asInstanceOf[Class[V]] + register(kc, msgpack) + register(vc, msgpack) + /* + Try { + if (!kc.isPrimitive) msgpack.register(kc) + if (!vc.isPrimitive) msgpack.register(vc) + } match { + case Failure(err) => log.warn(("Failed to register key/value class (%s/%s) with MsgPack. " + + "Falling back to default MsgPack serialization, or 'toString' as last resort. " + + "Exception: %s").format(kc, vc, err.getMessage)) + } + */ + rdd.map{ pair => + Try { + msgpack.write(pair) + } match { + case Failure(err) => + Try { + write((pair._1.toString, pair._2.toString)) + } match { + case Success(result) => result + case Failure(e) => throw e + } + case Success(result) => result + + } + //write(_) + } + } + + // SequenceFile converted to Text and then to String + def sequenceFile[K ,V](sc: JavaSparkContext, + path: String, + keyClass: String, + valueClass: String, + keyWrapper: String, + valueWrapper: String, + minSplits: Int) = { + implicit val kcm = ClassManifest.fromClass(Class.forName(keyClass)).asInstanceOf[ClassManifest[K]] + implicit val vcm = ClassManifest.fromClass(Class.forName(valueClass)).asInstanceOf[ClassManifest[V]] + val kc = kcm.erasure.asInstanceOf[Class[K]] + val vc = vcm.erasure.asInstanceOf[Class[V]] + + val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits) + val converted = convertRDD[K, V](rdd) + JavaRDD.fromRDD(serMsgPack[K, V](converted)) + //JavaRDD.fromRDD( + // .map{ case (a, b) => (a.toString, b.toString) }.map(stuff => write(stuff))) + } + + /* + def sequenceFile[K, V](sc: JavaSparkContext, + path: String, + keyWrapper: String, + valueWrapper: String, + minSplits: Int): JavaRDD[Array[Byte]] = { + val rdd = sc.sc.sequenceFile(path, classOf[Any], classOf[Any], minSplits) + val converted = convertRDD[K, V](rdd) + JavaRDD.fromRDD(serMsgPack[K, V](converted)) + //sequenceFile(sc, path, "java.lang.String", "java.lang.String", keyWrapper, valueWrapper, minSplits) + } + */ + + def mapToConf(map: java.util.HashMap[String, String]) = { + import collection.JavaConversions._ + val conf = new Configuration() + map.foreach{ case (k, v) => conf.set(k, v) } + conf + } + + /* Merges two configurations, keys from right overwrite any matching keys in left */ + def mergeConfs(left: Configuration, right: Configuration) = { + import collection.JavaConversions._ + right.iterator().foreach(entry => left.set(entry.getKey, entry.getValue)) + left + } + + // Arbitrary Hadoop InputFormat, key class and value class + def newHadoopFile[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext, + path: String, + inputFormatClazz: String, + keyClazz: String, + valueClazz: String, + keyWrapper: String, + valueWrapper: String, + confAsMap: java.util.HashMap[String, String]) = { + val conf = mapToConf(confAsMap) + val baseConf = sc.hadoopConfiguration() + val mergedConf = mergeConfs(baseConf, conf) + val rdd = + newHadoopFileFromClassNames[K, V, F](sc, + path, inputFormatClazz, keyClazz, valueClazz, keyWrapper, valueWrapper, mergedConf) + //.map{ case (k, v) => (k.toString, v.toString) } + val converted = convertRDD[K, V](rdd) + JavaRDD.fromRDD(serMsgPack[K, V](converted)) + //JavaPairRDD.fromRDD( + // newHadoopFileFromClassNames(sc, path, inputFormatClazz, keyClazz, valueClazz, keyWrapper, valueWrapper) + // .map(new PairFunction[(K, V), String, String] { def call(t: (K, V)) = (t._1.toString, t._2.toString) } ) + //) + } + + private def newHadoopFileFromClassNames[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext, + path: String, + inputFormatClazz: String, + keyClazz: String, + valueClazz: String, + keyWrapper: String, + valueWrapper: String, + conf: Configuration) = { + implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K]] + implicit val vcm = ClassManifest.fromClass(Class.forName(valueClazz)).asInstanceOf[ClassManifest[V]] + implicit val fcm = ClassManifest.fromClass(Class.forName(inputFormatClazz)).asInstanceOf[ClassManifest[F]] + val kc = kcm.erasure.asInstanceOf[Class[K]] + val vc = vcm.erasure.asInstanceOf[Class[V]] + val fc = fcm.erasure.asInstanceOf[Class[F]] + sc.sc.newAPIHadoopFile(path, fc, kc, vc, conf) + } + + /* + private def sequenceFile[K, V](sc: JavaSparkContext, + path: String, + keyClazz: String, + valueClazz: String, + keyWrapper: String, + valueWrapper: String, + minSplits: Int) = { + implicit val kcm = ClassManifest.fromClass(Class.forName("Any")).asInstanceOf[ClassManifest[K]] + implicit val vcm = ClassManifest.fromClass(Class.forName("Any")).asInstanceOf[ClassManifest[V]] + val kc = kcm.erasure.asInstanceOf[Class[K]] + val vc = vcm.erasure.asInstanceOf[Class[V]] + + val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits) + val converted = convertRDD[K, V](rdd) + JavaRDD.fromRDD(serMsgPack[K, V](converted)) + + /* + val rdd = if (kc.isInstanceOf[Writable] && vc.isInstanceOf[Writable]) { + val writables = sc.sc.sequenceFile(path, kc.asInstanceOf[Class[Writable]], vc.asInstanceOf[Class[Writable]], minSplits) + val w = writables.map{case (k,v) => (t.convert(k), t.convert(v))} + //implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K <:< Writable]] + //ClassManifest.fromClass(kc.asInstanceOf[Class[Writable]]) + //sequenceFileWritable(sc, path ,minSplits).asInstanceOf[RDD[(K, V)]] + //sequenceFileWritable(sc, kc, vc, path, minSplits) + } + else { + sc.sc.sequenceFile[K, V](path, minSplits) + + } + + */ + } + */ + + private def convertRDD[K, V](rdd: RDD[(K, V)]) = { + rdd.map{ + case (k: Writable, v: Writable) => (convert(k).asInstanceOf[K], convert(v).asInstanceOf[V]) + case (k: Writable, v) => (convert(k).asInstanceOf[K], v.asInstanceOf[V]) + case (k, v: Writable) => (k.asInstanceOf[K], convert(v).asInstanceOf[V]) + case (k, v) => (k.asInstanceOf[K], v.asInstanceOf[V]) + } + } + + private def convert(writable: Writable): Any = { + writable match { + case iw: IntWritable => SparkContext.intWritableConverter().convert(iw) + case dw: DoubleWritable => SparkContext.doubleWritableConverter().convert(dw) + case lw: LongWritable => SparkContext.longWritableConverter().convert(lw) + case fw: FloatWritable => SparkContext.floatWritableConverter().convert(fw) + case t: Text => SparkContext.stringWritableConverter().convert(t) + case bw: BooleanWritable => SparkContext.booleanWritableConverter().convert(bw) + case byw: BytesWritable => SparkContext.bytesWritableConverter().convert(byw) + case n: NullWritable => None + case aw: ArrayWritable => aw.get().map(convert(_)) + case mw: MapWritable => mw.map{ case (k, v) => (convert(k), convert(v)) }.toMap + case other => other + } + } + + /* + def sequenceFileWritable[K, V](sc: JavaSparkContext, + path: String, + minSplits: Int) + //(implicit km: ClassManifest[K], vm: ClassManifest[V]) + // kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]) + = { + + import SparkContext._ + implicit val kcm = ClassManifest.fromClass(keyClazz) //.asInstanceOf[ClassManifest[K]] + //implicit val vcm = ClassManifest.fromClass(valueClazz) //.asInstanceOf[ClassManifest[V]] + sc.sc.sequenceFile(path) //, kc, vc, minSplits) + // JavaRDD.fromRDD(serMsgPack[K, V](rdd)) + } + */ + + // + + def writeToStream(elem: Any, dataOut: DataOutputStream)(implicit m: ClassManifest[Any]) { elem match { case bytes: Array[Byte] => dataOut.writeInt(bytes.length) dataOut.write(bytes) - case pair: (Array[Byte], Array[Byte]) => - dataOut.writeInt(pair._1.length) - dataOut.write(pair._1) - dataOut.writeInt(pair._2.length) - dataOut.write(pair._2) + case (a: Array[Byte], b: Array[Byte]) => + dataOut.writeInt(a.length) + dataOut.write(a) + dataOut.writeInt(b.length) + dataOut.write(b) case str: String => dataOut.writeUTF(str) + //case (a: String, b: String) => + // dataOut.writeUTF(a) + // dataOut.writeUTF(b) case other => throw new SparkException("Unexpected element type " + other.getClass) } diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index ac87cffd9fbd4..25ce8b6fbdea3 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -248,7 +248,8 @@ object SparkBuild extends Build { "com.codahale.metrics" % "metrics-ganglia" % "3.0.0", "com.codahale.metrics" % "metrics-graphite" % "3.0.0", "com.twitter" % "chill_2.9.3" % "0.3.1", - "com.twitter" % "chill-java" % "0.3.1" + "com.twitter" % "chill-java" % "0.3.1", + "org.msgpack" %% "msgpack-scala" % "0.6.8" ) ) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index cbd41e58c4a78..4e38fec94d755 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -26,7 +26,7 @@ from pyspark.broadcast import Broadcast from pyspark.files import SparkFiles from pyspark.java_gateway import launch_gateway -from pyspark.serializers import PickleSerializer, BatchedSerializer, MUTF8Deserializer +from pyspark.serializers import PickleSerializer, BatchedSerializer, MUTF8Deserializer, PairMUTF8Deserializer, MsgPackDeserializer from pyspark.storagelevel import StorageLevel from pyspark.rdd import RDD @@ -51,7 +51,7 @@ class SparkContext(object): def __init__(self, master, jobName, sparkHome=None, pyFiles=None, - environment=None, batchSize=1024, serializer=PickleSerializer()): + environment=None, batchSize=1024, serializer=PickleSerializer()): """ Create a new SparkContext. @@ -95,15 +95,15 @@ def __init__(self, master, jobName, sparkHome=None, pyFiles=None, # Create the Java SparkContext through Py4J empty_string_array = self._gateway.new_array(self._jvm.String, 0) self._jsc = self._jvm.JavaSparkContext(master, jobName, sparkHome, - empty_string_array) + empty_string_array) # Create a single Accumulator in Java that we'll send all our updates through; # they will be passed back to us through a TCP server self._accumulatorServer = accumulators._start_update_server() (host, port) = self._accumulatorServer.server_address self._javaAccumulator = self._jsc.accumulator( - self._jvm.java.util.ArrayList(), - self._jvm.PythonAccumulatorParam(host, port)) + self._jvm.java.util.ArrayList(), + self._jvm.PythonAccumulatorParam(host, port)) self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python') # Broadcast's __reduce__ method stores Broadcast instances here. @@ -213,6 +213,49 @@ def textFile(self, name, minSplits=None): return RDD(self._jsc.textFile(name, minSplits), self, MUTF8Deserializer()) + ### + def sequenceFileAsText(self, name): + """ + Read a Hadoopp SequenceFile with arbitrary key and value class from HDFS, + a local file system (available on all nodes), or any Hadoop-supported file system URI, + and return it as an RDD of (String, String) where the key and value representations + are generated using the 'toString()' method of the relevant Java class. + """ + #minSplits = minSplits or min(self.defaultParallelism, 2) + jrdd = self._jvm.PythonRDD.sequenceFileAsText(self._jsc, name) + return RDD(jrdd, self, MsgPackDeserializer()) # MsgPackDeserializer PairMUTF8Deserializer + + def sequenceFile(self, name, keyClass, valueClass, keyWrapper="", valueWrapper="", minSplits=None): + """ + Read a Hadoopp SequenceFile with arbitrary key and value class from HDFS, + a local file system (available on all nodes), or any Hadoop-supported file system URI, + and return it as an RDD of (String, String) where the key and value representations + are generated using the 'toString()' method of the relevant Java class. + + >>> sc.sequenceFile("/tmp/spark/test/sfint/").collect() + [(1, 'aa'), (2, 'bb'), (2, 'aa'), (3, 'cc'), (2, 'bb'), (1, 'aa')] + """ + minSplits = minSplits or min(self.defaultParallelism, 2) + jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, keyClass, valueClass, keyWrapper, valueWrapper, minSplits) + #jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, keyWrapper, valueWrapper, minSplits) + return RDD(jrdd, self, MsgPackDeserializer()) # MsgPackDeserializer PairMUTF8Deserializer + + def newHadoopFile(self, name, inputFormat, keyClass, valueClass, keyWrapper="toString", valueWrapper="toString", conf = {}): + """ + Read a Hadoopp file with arbitrary InputFormat, key and value class from HDFS, + a local file system (available on all nodes), or any Hadoop-supported file system URI, + and return it as an RDD of (String, String), where the key and value representations + are generated using the 'toString()' method of the relevant Java class. + """ + jconf = self._jvm.java.util.HashMap() + for k, v in conf.iteritems(): + jconf[k] = v + jrdd = self._jvm.PythonRDD.newHadoopFile(self._jsc, name, inputFormat, keyClass, valueClass, keyWrapper, + valueWrapper, jconf) + return RDD(jrdd, self, MsgPackDeserializer()) + + ### + def _checkpointFile(self, name, input_deserializer): jrdd = self._jsc.checkpointFile(name) return RDD(jrdd, self, input_deserializer) @@ -344,12 +387,14 @@ def _getJavaStorageLevel(self, storageLevel): newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel return newStorageLevel(storageLevel.useDisk, storageLevel.useMemory, - storageLevel.deserialized, storageLevel.replication) + storageLevel.deserialized, storageLevel.replication) + def _test(): import atexit import doctest import tempfile + globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) globs['tempdir'] = tempfile.mkdtemp() diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 811fa6f018b23..fad9b55ffcef6 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -64,6 +64,7 @@ from itertools import chain, izip, product import marshal import struct +import msgpack from pyspark import cloudpickle @@ -260,7 +261,6 @@ class MarshalSerializer(FramedSerializer): dumps = marshal.dumps loads = marshal.loads - class MUTF8Deserializer(Serializer): """ Deserializes streams written by Java's DataOutputStream.writeUTF(). @@ -280,6 +280,38 @@ def load_stream(self, stream): return +class PairMUTF8Deserializer(Serializer): + """ + Deserializes streams of tuples written by Java's DataOutputStream.writeUTF(). + """ + def loads(self, stream): + l1 = struct.unpack('>H', stream.read(2))[0] + a = stream.read(l1).decode('utf8') + l2 = struct.unpack('>H', stream.read(2))[0] + b = stream.read(l2).decode('utf8') + return (a, b) + + def load_stream(self, stream): + while True: + try: + yield self.loads(stream) + except struct.error: + return + except EOFError: + return + +class MsgPackDeserializer(FramedSerializer): + """ + + """ + def loads(self, obj): + return msgpack.loads(obj, use_list=0) + + def dumps(self, obj): + return msgpack.dumps(obj) + + + def read_long(stream): length = stream.read(8) if length == "":