Skip to content

Commit

Permalink
Initial WIP of PySpark support for SequenceFile and arbitrary Hadoop …
Browse files Browse the repository at this point in the history
…InputFormat
  • Loading branch information
MLnick committed Dec 9, 2013
1 parent 5d46025 commit d86325f
Show file tree
Hide file tree
Showing 4 changed files with 383 additions and 16 deletions.
305 changes: 297 additions & 8 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,18 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.util.Utils

import org.apache.hadoop.io._
import org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat
import org.apache.spark.api.java.function.PairFunction
import scala.util.{Success, Failure, Try}
import org.msgpack
import org.msgpack.ScalaMessagePack
import org.apache.hadoop.mapreduce.InputFormat

import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.hadoop.conf.Configuration
import java.util

private[spark] class PythonRDD[T: ClassManifest](
parent: RDD[T],
Expand Down Expand Up @@ -185,7 +196,70 @@ private object SpecialLengths {
val TIMING_DATA = -3
}

private[spark] object PythonRDD {
case class TestClass(var id: String, var number: Int) {
def this() = this("", 0)
}

object TestHadoop extends App {

//PythonRDD.writeToStream((1, "bar"), new DataOutputStream(new FileOutputStream("/tmp/test.out")))


//val n = new NullWritable

import SparkContext._

val path = "/tmp/spark/test/sfarray/"
//val path = "/Users/Nick/workspace/java/faunus/output/job-0/"

val sc = new SparkContext("local[2]", "test")

//val rdd = sc.sequenceFile[NullWritable, FaunusVertex](path)
//val data = Seq((1.0, "aa"), (2.0, "bb"), (2.0, "aa"), (3.0, "cc"), (2.0, "bb"), (1.0, "aa"))
val data = Seq(
(1, Array(1.0, 2.0, 3.0)),
(2, Array(3.0, 4.0, 5.0)),
(3, Array(4.0, 5.0, 6.0))
)
val d = new DoubleWritable(5.0)
val a = new ArrayWritable(classOf[DoubleWritable], Array(d))

val rdd = sc.parallelize(data, numSlices = 2)
//.map({ case (k, v) => (new IntWritable(k), v.map(new DoubleWritable(_))) })
.map{ case (k, v) => (new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_)))) }
rdd.saveAsNewAPIHadoopFile[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[IntWritable, ArrayWritable]](path)

/*
val data = Seq(
("1", TestClass("test1", 123)),
("2", TestClass("test2", 456)),
("1", TestClass("test3", 123)),
("3", TestClass("test56", 456)),
("2", TestClass("test2", 123))
)
val rdd = sc.parallelize(data, numSlices = 2).map{ case (k, v) => (new Text(k), v) }
rdd.saveAsNewAPIHadoopFile(path,
classOf[Text], classOf[TestClass],
classOf[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[Text, TestClass]])
//val rdd2 = Seq((1, ))
val seq = sc.sequenceFile[Double, String](path)
val seqR = seq.collect()
val packed = PythonRDD.serMsgPack(rdd)
val packedR = packed.collect()
val packed2 = PythonRDD.serMsgPack(seq)
val packedR2 = packed2.collect()
println(seqR.mkString(","))
println(packedR.mkString(","))
println(packedR2.mkString(","))
*/

}

private[spark] object PythonRDD extends Logging {

def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int):
JavaRDD[Array[Byte]] = {
Expand All @@ -205,18 +279,233 @@ private[spark] object PythonRDD {
JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism))
}

def writeToStream(elem: Any, dataOut: DataOutputStream) {
// PySpark / Hadoop InputFormat stuff

def register[T](clazz: Class[T], msgpack: ScalaMessagePack) = {
Try {
if (!clazz.isPrimitive) msgpack.register(clazz)
}.getOrElse(log.warn("Failed to register class (%s) with MsgPack. " +
"Falling back to default MsgPack serialization, or 'toString' as last resort".format(clazz.toString)))
}

// serialize and RDD[(K, V)] -> RDD[Array[Byte]] using MsgPack
def serMsgPack[K, V](rdd: RDD[(K, V)]) = {
import org.msgpack.ScalaMessagePack._
val msgpack = new ScalaMessagePack with Serializable
val first = rdd.first()
val kc = ClassManifest.fromClass(first._1.getClass).asInstanceOf[ClassManifest[K]].erasure.asInstanceOf[Class[K]]
val vc = ClassManifest.fromClass(first._2.getClass).asInstanceOf[ClassManifest[V]].erasure.asInstanceOf[Class[V]]
register(kc, msgpack)
register(vc, msgpack)
/*
Try {
if (!kc.isPrimitive) msgpack.register(kc)
if (!vc.isPrimitive) msgpack.register(vc)
} match {
case Failure(err) => log.warn(("Failed to register key/value class (%s/%s) with MsgPack. " +
"Falling back to default MsgPack serialization, or 'toString' as last resort. " +
"Exception: %s").format(kc, vc, err.getMessage))
}
*/
rdd.map{ pair =>
Try {
msgpack.write(pair)
} match {
case Failure(err) =>
Try {
write((pair._1.toString, pair._2.toString))
} match {
case Success(result) => result
case Failure(e) => throw e
}
case Success(result) => result

}
//write(_)
}
}

// SequenceFile converted to Text and then to String
def sequenceFile[K ,V](sc: JavaSparkContext,
path: String,
keyClass: String,
valueClass: String,
keyWrapper: String,
valueWrapper: String,
minSplits: Int) = {
implicit val kcm = ClassManifest.fromClass(Class.forName(keyClass)).asInstanceOf[ClassManifest[K]]
implicit val vcm = ClassManifest.fromClass(Class.forName(valueClass)).asInstanceOf[ClassManifest[V]]
val kc = kcm.erasure.asInstanceOf[Class[K]]
val vc = vcm.erasure.asInstanceOf[Class[V]]

val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val converted = convertRDD[K, V](rdd)
JavaRDD.fromRDD(serMsgPack[K, V](converted))
//JavaRDD.fromRDD(
// .map{ case (a, b) => (a.toString, b.toString) }.map(stuff => write(stuff)))
}

/*
def sequenceFile[K, V](sc: JavaSparkContext,
path: String,
keyWrapper: String,
valueWrapper: String,
minSplits: Int): JavaRDD[Array[Byte]] = {
val rdd = sc.sc.sequenceFile(path, classOf[Any], classOf[Any], minSplits)
val converted = convertRDD[K, V](rdd)
JavaRDD.fromRDD(serMsgPack[K, V](converted))
//sequenceFile(sc, path, "java.lang.String", "java.lang.String", keyWrapper, valueWrapper, minSplits)
}
*/

def mapToConf(map: java.util.HashMap[String, String]) = {
import collection.JavaConversions._
val conf = new Configuration()
map.foreach{ case (k, v) => conf.set(k, v) }
conf
}

/* Merges two configurations, keys from right overwrite any matching keys in left */
def mergeConfs(left: Configuration, right: Configuration) = {
import collection.JavaConversions._
right.iterator().foreach(entry => left.set(entry.getKey, entry.getValue))
left
}

// Arbitrary Hadoop InputFormat, key class and value class
def newHadoopFile[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
path: String,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = mapToConf(confAsMap)
val baseConf = sc.hadoopConfiguration()
val mergedConf = mergeConfs(baseConf, conf)
val rdd =
newHadoopFileFromClassNames[K, V, F](sc,
path, inputFormatClazz, keyClazz, valueClazz, keyWrapper, valueWrapper, mergedConf)
//.map{ case (k, v) => (k.toString, v.toString) }
val converted = convertRDD[K, V](rdd)
JavaRDD.fromRDD(serMsgPack[K, V](converted))
//JavaPairRDD.fromRDD(
// newHadoopFileFromClassNames(sc, path, inputFormatClazz, keyClazz, valueClazz, keyWrapper, valueWrapper)
// .map(new PairFunction[(K, V), String, String] { def call(t: (K, V)) = (t._1.toString, t._2.toString) } )
//)
}

private def newHadoopFileFromClassNames[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
path: String,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
conf: Configuration) = {
implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K]]
implicit val vcm = ClassManifest.fromClass(Class.forName(valueClazz)).asInstanceOf[ClassManifest[V]]
implicit val fcm = ClassManifest.fromClass(Class.forName(inputFormatClazz)).asInstanceOf[ClassManifest[F]]
val kc = kcm.erasure.asInstanceOf[Class[K]]
val vc = vcm.erasure.asInstanceOf[Class[V]]
val fc = fcm.erasure.asInstanceOf[Class[F]]
sc.sc.newAPIHadoopFile(path, fc, kc, vc, conf)
}

/*
private def sequenceFile[K, V](sc: JavaSparkContext,
path: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
minSplits: Int) = {
implicit val kcm = ClassManifest.fromClass(Class.forName("Any")).asInstanceOf[ClassManifest[K]]
implicit val vcm = ClassManifest.fromClass(Class.forName("Any")).asInstanceOf[ClassManifest[V]]
val kc = kcm.erasure.asInstanceOf[Class[K]]
val vc = vcm.erasure.asInstanceOf[Class[V]]
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val converted = convertRDD[K, V](rdd)
JavaRDD.fromRDD(serMsgPack[K, V](converted))
/*
val rdd = if (kc.isInstanceOf[Writable] && vc.isInstanceOf[Writable]) {
val writables = sc.sc.sequenceFile(path, kc.asInstanceOf[Class[Writable]], vc.asInstanceOf[Class[Writable]], minSplits)
val w = writables.map{case (k,v) => (t.convert(k), t.convert(v))}
//implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K <:< Writable]]
//ClassManifest.fromClass(kc.asInstanceOf[Class[Writable]])
//sequenceFileWritable(sc, path ,minSplits).asInstanceOf[RDD[(K, V)]]
//sequenceFileWritable(sc, kc, vc, path, minSplits)
}
else {
sc.sc.sequenceFile[K, V](path, minSplits)
}
*/
}
*/

private def convertRDD[K, V](rdd: RDD[(K, V)]) = {
rdd.map{
case (k: Writable, v: Writable) => (convert(k).asInstanceOf[K], convert(v).asInstanceOf[V])
case (k: Writable, v) => (convert(k).asInstanceOf[K], v.asInstanceOf[V])
case (k, v: Writable) => (k.asInstanceOf[K], convert(v).asInstanceOf[V])
case (k, v) => (k.asInstanceOf[K], v.asInstanceOf[V])
}
}

private def convert(writable: Writable): Any = {
writable match {
case iw: IntWritable => SparkContext.intWritableConverter().convert(iw)
case dw: DoubleWritable => SparkContext.doubleWritableConverter().convert(dw)
case lw: LongWritable => SparkContext.longWritableConverter().convert(lw)
case fw: FloatWritable => SparkContext.floatWritableConverter().convert(fw)
case t: Text => SparkContext.stringWritableConverter().convert(t)
case bw: BooleanWritable => SparkContext.booleanWritableConverter().convert(bw)
case byw: BytesWritable => SparkContext.bytesWritableConverter().convert(byw)
case n: NullWritable => None
case aw: ArrayWritable => aw.get().map(convert(_))
case mw: MapWritable => mw.map{ case (k, v) => (convert(k), convert(v)) }.toMap
case other => other
}
}

/*
def sequenceFileWritable[K, V](sc: JavaSparkContext,
path: String,
minSplits: Int)
//(implicit km: ClassManifest[K], vm: ClassManifest[V])
// kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
= {
import SparkContext._
implicit val kcm = ClassManifest.fromClass(keyClazz) //.asInstanceOf[ClassManifest[K]]
//implicit val vcm = ClassManifest.fromClass(valueClazz) //.asInstanceOf[ClassManifest[V]]
sc.sc.sequenceFile(path) //, kc, vc, minSplits)
// JavaRDD.fromRDD(serMsgPack[K, V](rdd))
}
*/

//

def writeToStream(elem: Any, dataOut: DataOutputStream)(implicit m: ClassManifest[Any]) {
elem match {
case bytes: Array[Byte] =>
dataOut.writeInt(bytes.length)
dataOut.write(bytes)
case pair: (Array[Byte], Array[Byte]) =>
dataOut.writeInt(pair._1.length)
dataOut.write(pair._1)
dataOut.writeInt(pair._2.length)
dataOut.write(pair._2)
case (a: Array[Byte], b: Array[Byte]) =>
dataOut.writeInt(a.length)
dataOut.write(a)
dataOut.writeInt(b.length)
dataOut.write(b)
case str: String =>
dataOut.writeUTF(str)
//case (a: String, b: String) =>
// dataOut.writeUTF(a)
// dataOut.writeUTF(b)
case other =>
throw new SparkException("Unexpected element type " + other.getClass)
}
Expand Down
3 changes: 2 additions & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,8 @@ object SparkBuild extends Build {
"com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
"com.codahale.metrics" % "metrics-graphite" % "3.0.0",
"com.twitter" % "chill_2.9.3" % "0.3.1",
"com.twitter" % "chill-java" % "0.3.1"
"com.twitter" % "chill-java" % "0.3.1",
"org.msgpack" %% "msgpack-scala" % "0.6.8"
)
)

Expand Down
Loading

0 comments on commit d86325f

Please sign in to comment.