Skip to content

Commit

Permalink
Adding supporting sequncefiles for tests. Cleaning up
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Dec 15, 2013
1 parent 4b0a43f commit c304cc8
Show file tree
Hide file tree
Showing 30 changed files with 213 additions and 112 deletions.
165 changes: 89 additions & 76 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import org.apache.spark.api.java.function.PairFunction
import scala.util.{Success, Failure, Try}
import org.msgpack
import org.msgpack.ScalaMessagePack
import org.apache.hadoop.mapreduce.InputFormat
import org.apache.hadoop.mapred.InputFormat

import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapred.SequenceFileOutputFormat
import org.apache.hadoop.mapred.{JobConf, SequenceFileOutputFormat}
import org.apache.hadoop.conf.Configuration
import java.util

Expand Down Expand Up @@ -196,69 +196,6 @@ private object SpecialLengths {
val TIMING_DATA = -3
}

case class TestClass(var id: String, var number: Int) {
def this() = this("", 0)
}

object TestHadoop extends App {

//PythonRDD.writeToStream((1, "bar"), new DataOutputStream(new FileOutputStream("/tmp/test.out")))


//val n = new NullWritable

import SparkContext._

val path = "/tmp/spark/test/sfarray/"
//val path = "/Users/Nick/workspace/java/faunus/output/job-0/"

val sc = new SparkContext("local[2]", "test")

//val rdd = sc.sequenceFile[NullWritable, FaunusVertex](path)
//val data = Seq((1.0, "aa"), (2.0, "bb"), (2.0, "aa"), (3.0, "cc"), (2.0, "bb"), (1.0, "aa"))
val data = Seq(
(1, Array(1.0, 2.0, 3.0)),
(2, Array(3.0, 4.0, 5.0)),
(3, Array(4.0, 5.0, 6.0))
)
val d = new DoubleWritable(5.0)
val a = new ArrayWritable(classOf[DoubleWritable], Array(d))

val rdd = sc.parallelize(data, numSlices = 2)
//.map({ case (k, v) => (new IntWritable(k), v.map(new DoubleWritable(_))) })
.map{ case (k, v) => (new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_)))) }
rdd.saveAsNewAPIHadoopFile[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[IntWritable, ArrayWritable]](path)

/*
val data = Seq(
("1", TestClass("test1", 123)),
("2", TestClass("test2", 456)),
("1", TestClass("test3", 123)),
("3", TestClass("test56", 456)),
("2", TestClass("test2", 123))
)
val rdd = sc.parallelize(data, numSlices = 2).map{ case (k, v) => (new Text(k), v) }
rdd.saveAsNewAPIHadoopFile(path,
classOf[Text], classOf[TestClass],
classOf[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[Text, TestClass]])
//val rdd2 = Seq((1, ))
val seq = sc.sequenceFile[Double, String](path)
val seqR = seq.collect()
val packed = PythonRDD.serMsgPack(rdd)
val packedR = packed.collect()
val packed2 = PythonRDD.serMsgPack(seq)
val packedR2 = packed2.collect()
println(seqR.mkString(","))
println(packedR.mkString(","))
println(packedR2.mkString(","))
*/

}

private[spark] object PythonRDD extends Logging {

def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int):
Expand All @@ -281,7 +218,7 @@ private[spark] object PythonRDD extends Logging {

// PySpark / Hadoop InputFormat stuff

// SequenceFile
/** Create and RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]] */
def sequenceFile[K ,V](sc: JavaSparkContext,
path: String,
keyClass: String,
Expand All @@ -299,8 +236,11 @@ private[spark] object PythonRDD extends Logging {
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
}

// Arbitrary Hadoop InputFormat, key class and value class
def newHadoopFile[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
/**
* Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]],
* key and value class
*/
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
path: String,
inputFormatClazz: String,
keyClazz: String,
Expand All @@ -312,30 +252,103 @@ private[spark] object PythonRDD extends Logging {
val baseConf = sc.hadoopConfiguration()
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
val rdd =
newHadoopFileFromClassNames[K, V, F](sc,
path, inputFormatClazz, keyClazz, valueClazz, keyWrapper, valueWrapper, mergedConf)
newAPIHadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClazz, keyClazz, valueClazz, mergedConf)
val converted = SerDeUtil.convertRDD[K, V](rdd)
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
}

/**
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is passed in from Python,
* using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]], key and value class
*/
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
newAPIHadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClazz, keyClazz, valueClazz, conf)
val converted = SerDeUtil.convertRDD[K, V](rdd)
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
}

private def newHadoopFileFromClassNames[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
path: String,
private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
path: Option[String] = None,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
conf: Configuration) = {
implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K]]
implicit val vcm = ClassManifest.fromClass(Class.forName(valueClazz)).asInstanceOf[ClassManifest[V]]
implicit val fcm = ClassManifest.fromClass(Class.forName(inputFormatClazz)).asInstanceOf[ClassManifest[F]]
val kc = kcm.erasure.asInstanceOf[Class[K]]
val vc = vcm.erasure.asInstanceOf[Class[V]]
val fc = fcm.erasure.asInstanceOf[Class[F]]
sc.sc.newAPIHadoopFile(path, fc, kc, vc, conf)
val rdd = if (path.isDefined) {
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
} else {
sc.sc.newAPIHadoopRDD[K, V, F](conf, fc, kc, vc)
}
rdd
}

def hadoopFile[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
path: String,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val baseConf = sc.hadoopConfiguration()
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
Some(path), inputFormatClazz, keyClazz, valueClazz, mergedConf)
val converted = SerDeUtil.convertRDD[K, V](rdd)
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
}

def hadoopRDD[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val rdd =
hadoopRDDFromClassNames[K, V, F](sc,
None, inputFormatClazz, keyClazz, valueClazz, conf)
val converted = SerDeUtil.convertRDD[K, V](rdd)
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
}

//
private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](sc: JavaSparkContext,
path: Option[String] = None,
inputFormatClazz: String,
keyClazz: String,
valueClazz: String,
conf: Configuration) = {
implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K]]
implicit val vcm = ClassManifest.fromClass(Class.forName(valueClazz)).asInstanceOf[ClassManifest[V]]
implicit val fcm = ClassManifest.fromClass(Class.forName(inputFormatClazz)).asInstanceOf[ClassManifest[F]]
val kc = kcm.erasure.asInstanceOf[Class[K]]
val vc = vcm.erasure.asInstanceOf[Class[V]]
val fc = fcm.erasure.asInstanceOf[Class[F]]
val rdd = if (path.isDefined) {
sc.sc.hadoopFile(path.get, fc, kc, vc)
} else {
sc.sc.hadoopRDD(new JobConf(conf), fc, kc, vc)
}
rdd
}

def writeToStream(elem: Any, dataOut: DataOutputStream)(implicit m: ClassManifest[Any]) {
elem match {
Expand Down
64 changes: 41 additions & 23 deletions core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,43 +15,60 @@ import scala.util.Failure
*/
private[python] object SerDeUtil extends Logging {

/** Attempts to register a class with MsgPack, only if it is not a primitive or a String */
def register[T](clazz: Class[T], msgpack: ScalaMessagePack) {
//implicit val kcm = ClassManifest.fromClass(clazz)
//val kc = kcm.erasure
Try {
log.info("%s".format(clazz))
clazz match {
case c if c.isPrimitive =>
case c if c.isInstanceOf[java.lang.String] =>
case _ => msgpack.register(clazz)
}
}.getOrElse(log.warn("Failed to register class (%s) with MsgPack. ".format(clazz.getName) +
"Falling back to default MsgPack serialization, or 'toString' as last resort"))
//if (kc.isInstance("") || kc.isPrimitive) {
// log.info("Class: %s doesn't need to be registered".format(kc.getName))
//} else {
msgpack.register(clazz)
log.info("Registered key/value class with MsgPack: %s".format(clazz))
//}

} match {
case Failure(err) =>
log.warn("Failed to register class (%s) with MsgPack. ".format(clazz.getName) +
"Falling back to default MsgPack serialization, or 'toString' as last resort. " +
"Error: %s".format(err.getMessage))
case Success(result) =>
}
}

// serialize and RDD[(K, V)] -> RDD[Array[Byte]] using MsgPack
/** Serializes an RDD[(K, V)] -> RDD[Array[Byte]] using MsgPack */
def serMsgPack[K, V](rdd: RDD[(K, V)]) = {
import org.msgpack.ScalaMessagePack._
val msgpack = new ScalaMessagePack with Serializable
val first = rdd.first()
val kc = ClassManifest.fromClass(first._1.getClass).asInstanceOf[ClassManifest[K]].erasure.asInstanceOf[Class[K]]
val vc = ClassManifest.fromClass(first._2.getClass).asInstanceOf[ClassManifest[V]].erasure.asInstanceOf[Class[V]]
register(kc, msgpack)
register(vc, msgpack)
rdd.map{ pair =>
rdd.mapPartitions{ pairs =>
val mp = new ScalaMessagePack
var triedReg = false
pairs.map{ pair =>
Try {
msgpack.write(pair)
if (!triedReg) {
register(pair._1.getClass, mp)
register(pair._2.getClass, mp)
triedReg = true
}
mp.write(pair)
} match {
case Failure(err) =>
Try {
write((pair._1.toString, pair._2.toString))
} match {
case Success(result) => result
case Failure(e) => throw e
}
log.debug("Failed to write", err)
Try {
write((pair._1.toString, pair._2.toString))
} match {
case Success(result) => result
case Failure(e) => throw e
}
case Success(result) => result
}
}
}
}

/**
* Converts an RDD of (K, V) pairs, where K and/or V could be instances of [[org.apache.hadoop.io.Writable]],
* into an RDD[(K, V)]
*/
def convertRDD[K, V](rdd: RDD[(K, V)]) = {
rdd.map{
case (k: Writable, v: Writable) => (convert(k).asInstanceOf[K], convert(v).asInstanceOf[V])
Expand All @@ -61,6 +78,7 @@ private[python] object SerDeUtil extends Logging {
}
}

/** Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or object representation */
def convert(writable: Writable): Any = {
import collection.JavaConversions._
writable match {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package org.apache.spark.api.python

import org.apache.spark.SparkContext
import org.apache.hadoop.io._
import scala.Array
import java.io.{DataOutput, DataInput}

case class TestWritable(var str: String, var numi: Int, var numd: Double) extends Writable {
def this() = this("", 0, 0.0)

def write(p1: DataOutput) = {
p1.writeUTF(str)
p1.writeInt(numi)
p1.writeDouble(numd)
}

def readFields(p1: DataInput) = {
str = p1.readUTF()
numi = p1.readInt()
numd = p1.readDouble()
}
}

object WriteInputFormatTests extends App {
import SparkContext._

val sc = new SparkContext("local[2]", "test")

val textPath = "../python/test_support/data/sftext/"
val intPath = "../python/test_support/data/sfint/"
val doublePath = "../python/test_support/data/sfdouble/"
val arrPath = "../python/test_support/data/sfarray/"
val classPath = "../python/test_support/data/sfclass/"

val intKeys = Seq((1.0, "aa"), (2.0, "bb"), (2.0, "aa"), (3.0, "cc"), (2.0, "bb"), (1.0, "aa"))
sc.parallelize(intKeys).saveAsSequenceFile(intPath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toDouble, v) }).saveAsSequenceFile(doublePath)
sc.parallelize(intKeys.map{ case (k, v) => (k.toString, v) }).saveAsSequenceFile(textPath)

val data = Seq(
(1, Array(1.0, 2.0, 3.0)),
(2, Array(3.0, 4.0, 5.0)),
(3, Array(4.0, 5.0, 6.0))
)
sc.parallelize(data, numSlices = 2)
.map{ case (k, v) =>
(new IntWritable(k), new ArrayWritable(classOf[DoubleWritable], v.map(new DoubleWritable(_))))
}
.saveAsNewAPIHadoopFile[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[IntWritable, ArrayWritable]](arrPath)

val testClass = Seq(
("1", TestWritable("test1", 123, 54.0)),
("2", TestWritable("test2", 456, 8762.3)),
("1", TestWritable("test3", 123, 423.1)),
("3", TestWritable("test56", 456, 423.5)),
("2", TestWritable("test2", 123, 5435.2))
)
val rdd = sc.parallelize(testClass, numSlices = 2).map{ case (k, v) => (new Text(k), v) }
rdd.saveAsNewAPIHadoopFile(classPath,
classOf[Text], classOf[TestWritable],
classOf[org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat[Text, TestWritable]])


}
Loading

0 comments on commit c304cc8

Please sign in to comment.