Skip to content

Commit

Permalink
Refactoring utils into own objects. Cleaning up old commented-out code
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Dec 12, 2013
1 parent d86325f commit 4b0a43f
Show file tree
Hide file tree
Showing 20 changed files with 119 additions and 168 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.apache.spark.api.python

import org.apache.hadoop.conf.Configuration

/**
* Utilities for working with Python objects -> Hadoop-related objects
*/
private[python] object PythonHadoopUtil {

def mapToConf(map: java.util.HashMap[String, String]) = {
import collection.JavaConversions._
val conf = new Configuration()
map.foreach{ case (k, v) => conf.set(k, v) }
conf
}

/* Merges two configurations, keys from right overwrite any matching keys in left */
def mergeConfs(left: Configuration, right: Configuration) = {
import collection.JavaConversions._
right.iterator().foreach(entry => left.set(entry.getKey, entry.getValue))
left
}

}
171 changes: 7 additions & 164 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -281,51 +281,7 @@ private[spark] object PythonRDD extends Logging {

// PySpark / Hadoop InputFormat stuff

def register[T](clazz: Class[T], msgpack: ScalaMessagePack) = {
Try {
if (!clazz.isPrimitive) msgpack.register(clazz)
}.getOrElse(log.warn("Failed to register class (%s) with MsgPack. " +
"Falling back to default MsgPack serialization, or 'toString' as last resort".format(clazz.toString)))
}

// serialize and RDD[(K, V)] -> RDD[Array[Byte]] using MsgPack
def serMsgPack[K, V](rdd: RDD[(K, V)]) = {
import org.msgpack.ScalaMessagePack._
val msgpack = new ScalaMessagePack with Serializable
val first = rdd.first()
val kc = ClassManifest.fromClass(first._1.getClass).asInstanceOf[ClassManifest[K]].erasure.asInstanceOf[Class[K]]
val vc = ClassManifest.fromClass(first._2.getClass).asInstanceOf[ClassManifest[V]].erasure.asInstanceOf[Class[V]]
register(kc, msgpack)
register(vc, msgpack)
/*
Try {
if (!kc.isPrimitive) msgpack.register(kc)
if (!vc.isPrimitive) msgpack.register(vc)
} match {
case Failure(err) => log.warn(("Failed to register key/value class (%s/%s) with MsgPack. " +
"Falling back to default MsgPack serialization, or 'toString' as last resort. " +
"Exception: %s").format(kc, vc, err.getMessage))
}
*/
rdd.map{ pair =>
Try {
msgpack.write(pair)
} match {
case Failure(err) =>
Try {
write((pair._1.toString, pair._2.toString))
} match {
case Success(result) => result
case Failure(e) => throw e
}
case Success(result) => result

}
//write(_)
}
}

// SequenceFile converted to Text and then to String
// SequenceFile
def sequenceFile[K ,V](sc: JavaSparkContext,
path: String,
keyClass: String,
Expand All @@ -339,37 +295,8 @@ private[spark] object PythonRDD extends Logging {
val vc = vcm.erasure.asInstanceOf[Class[V]]

val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val converted = convertRDD[K, V](rdd)
JavaRDD.fromRDD(serMsgPack[K, V](converted))
//JavaRDD.fromRDD(
// .map{ case (a, b) => (a.toString, b.toString) }.map(stuff => write(stuff)))
}

/*
def sequenceFile[K, V](sc: JavaSparkContext,
path: String,
keyWrapper: String,
valueWrapper: String,
minSplits: Int): JavaRDD[Array[Byte]] = {
val rdd = sc.sc.sequenceFile(path, classOf[Any], classOf[Any], minSplits)
val converted = convertRDD[K, V](rdd)
JavaRDD.fromRDD(serMsgPack[K, V](converted))
//sequenceFile(sc, path, "java.lang.String", "java.lang.String", keyWrapper, valueWrapper, minSplits)
}
*/

def mapToConf(map: java.util.HashMap[String, String]) = {
import collection.JavaConversions._
val conf = new Configuration()
map.foreach{ case (k, v) => conf.set(k, v) }
conf
}

/* Merges two configurations, keys from right overwrite any matching keys in left */
def mergeConfs(left: Configuration, right: Configuration) = {
import collection.JavaConversions._
right.iterator().foreach(entry => left.set(entry.getKey, entry.getValue))
left
val converted = SerDeUtil.convertRDD[K, V](rdd)
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
}

// Arbitrary Hadoop InputFormat, key class and value class
Expand All @@ -381,19 +308,14 @@ private[spark] object PythonRDD extends Logging {
keyWrapper: String,
valueWrapper: String,
confAsMap: java.util.HashMap[String, String]) = {
val conf = mapToConf(confAsMap)
val conf = PythonHadoopUtil.mapToConf(confAsMap)
val baseConf = sc.hadoopConfiguration()
val mergedConf = mergeConfs(baseConf, conf)
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
val rdd =
newHadoopFileFromClassNames[K, V, F](sc,
path, inputFormatClazz, keyClazz, valueClazz, keyWrapper, valueWrapper, mergedConf)
//.map{ case (k, v) => (k.toString, v.toString) }
val converted = convertRDD[K, V](rdd)
JavaRDD.fromRDD(serMsgPack[K, V](converted))
//JavaPairRDD.fromRDD(
// newHadoopFileFromClassNames(sc, path, inputFormatClazz, keyClazz, valueClazz, keyWrapper, valueWrapper)
// .map(new PairFunction[(K, V), String, String] { def call(t: (K, V)) = (t._1.toString, t._2.toString) } )
//)
val converted = SerDeUtil.convertRDD[K, V](rdd)
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
}

private def newHadoopFileFromClassNames[K, V, F <: NewInputFormat[K, V]](sc: JavaSparkContext,
Expand All @@ -413,82 +335,6 @@ private[spark] object PythonRDD extends Logging {
sc.sc.newAPIHadoopFile(path, fc, kc, vc, conf)
}

/*
private def sequenceFile[K, V](sc: JavaSparkContext,
path: String,
keyClazz: String,
valueClazz: String,
keyWrapper: String,
valueWrapper: String,
minSplits: Int) = {
implicit val kcm = ClassManifest.fromClass(Class.forName("Any")).asInstanceOf[ClassManifest[K]]
implicit val vcm = ClassManifest.fromClass(Class.forName("Any")).asInstanceOf[ClassManifest[V]]
val kc = kcm.erasure.asInstanceOf[Class[K]]
val vc = vcm.erasure.asInstanceOf[Class[V]]
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
val converted = convertRDD[K, V](rdd)
JavaRDD.fromRDD(serMsgPack[K, V](converted))
/*
val rdd = if (kc.isInstanceOf[Writable] && vc.isInstanceOf[Writable]) {
val writables = sc.sc.sequenceFile(path, kc.asInstanceOf[Class[Writable]], vc.asInstanceOf[Class[Writable]], minSplits)
val w = writables.map{case (k,v) => (t.convert(k), t.convert(v))}
//implicit val kcm = ClassManifest.fromClass(Class.forName(keyClazz)).asInstanceOf[ClassManifest[K <:< Writable]]
//ClassManifest.fromClass(kc.asInstanceOf[Class[Writable]])
//sequenceFileWritable(sc, path ,minSplits).asInstanceOf[RDD[(K, V)]]
//sequenceFileWritable(sc, kc, vc, path, minSplits)
}
else {
sc.sc.sequenceFile[K, V](path, minSplits)
}
*/
}
*/

private def convertRDD[K, V](rdd: RDD[(K, V)]) = {
rdd.map{
case (k: Writable, v: Writable) => (convert(k).asInstanceOf[K], convert(v).asInstanceOf[V])
case (k: Writable, v) => (convert(k).asInstanceOf[K], v.asInstanceOf[V])
case (k, v: Writable) => (k.asInstanceOf[K], convert(v).asInstanceOf[V])
case (k, v) => (k.asInstanceOf[K], v.asInstanceOf[V])
}
}

private def convert(writable: Writable): Any = {
writable match {
case iw: IntWritable => SparkContext.intWritableConverter().convert(iw)
case dw: DoubleWritable => SparkContext.doubleWritableConverter().convert(dw)
case lw: LongWritable => SparkContext.longWritableConverter().convert(lw)
case fw: FloatWritable => SparkContext.floatWritableConverter().convert(fw)
case t: Text => SparkContext.stringWritableConverter().convert(t)
case bw: BooleanWritable => SparkContext.booleanWritableConverter().convert(bw)
case byw: BytesWritable => SparkContext.bytesWritableConverter().convert(byw)
case n: NullWritable => None
case aw: ArrayWritable => aw.get().map(convert(_))
case mw: MapWritable => mw.map{ case (k, v) => (convert(k), convert(v)) }.toMap
case other => other
}
}

/*
def sequenceFileWritable[K, V](sc: JavaSparkContext,
path: String,
minSplits: Int)
//(implicit km: ClassManifest[K], vm: ClassManifest[V])
// kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
= {
import SparkContext._
implicit val kcm = ClassManifest.fromClass(keyClazz) //.asInstanceOf[ClassManifest[K]]
//implicit val vcm = ClassManifest.fromClass(valueClazz) //.asInstanceOf[ClassManifest[V]]
sc.sc.sequenceFile(path) //, kc, vc, minSplits)
// JavaRDD.fromRDD(serMsgPack[K, V](rdd))
}
*/

//

def writeToStream(elem: Any, dataOut: DataOutputStream)(implicit m: ClassManifest[Any]) {
Expand All @@ -503,9 +349,6 @@ private[spark] object PythonRDD extends Logging {
dataOut.write(b)
case str: String =>
dataOut.writeUTF(str)
//case (a: String, b: String) =>
// dataOut.writeUTF(a)
// dataOut.writeUTF(b)
case other =>
throw new SparkException("Unexpected element type " + other.getClass)
}
Expand Down
81 changes: 81 additions & 0 deletions core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package org.apache.spark.api.python

import org.msgpack.ScalaMessagePack
import scala.util.Try
import org.apache.spark.rdd.RDD
import java.io.Serializable
import org.apache.spark.{SparkContext, Logging}
import org.apache.hadoop.io._
import scala.util.Success
import scala.util.Failure

/**
* Utilities for serialization / deserialization between Python and Java, using MsgPack.
* Also contains utilities for converting [[org.apache.hadoop.io.Writable]] -> Scala objects and primitives
*/
private[python] object SerDeUtil extends Logging {

def register[T](clazz: Class[T], msgpack: ScalaMessagePack) {
Try {
log.info("%s".format(clazz))
clazz match {
case c if c.isPrimitive =>
case c if c.isInstanceOf[java.lang.String] =>
case _ => msgpack.register(clazz)
}
}.getOrElse(log.warn("Failed to register class (%s) with MsgPack. ".format(clazz.getName) +
"Falling back to default MsgPack serialization, or 'toString' as last resort"))
}

// serialize and RDD[(K, V)] -> RDD[Array[Byte]] using MsgPack
def serMsgPack[K, V](rdd: RDD[(K, V)]) = {
import org.msgpack.ScalaMessagePack._
val msgpack = new ScalaMessagePack with Serializable
val first = rdd.first()
val kc = ClassManifest.fromClass(first._1.getClass).asInstanceOf[ClassManifest[K]].erasure.asInstanceOf[Class[K]]
val vc = ClassManifest.fromClass(first._2.getClass).asInstanceOf[ClassManifest[V]].erasure.asInstanceOf[Class[V]]
register(kc, msgpack)
register(vc, msgpack)
rdd.map{ pair =>
Try {
msgpack.write(pair)
} match {
case Failure(err) =>
Try {
write((pair._1.toString, pair._2.toString))
} match {
case Success(result) => result
case Failure(e) => throw e
}
case Success(result) => result
}
}
}

def convertRDD[K, V](rdd: RDD[(K, V)]) = {
rdd.map{
case (k: Writable, v: Writable) => (convert(k).asInstanceOf[K], convert(v).asInstanceOf[V])
case (k: Writable, v) => (convert(k).asInstanceOf[K], v.asInstanceOf[V])
case (k, v: Writable) => (k.asInstanceOf[K], convert(v).asInstanceOf[V])
case (k, v) => (k.asInstanceOf[K], v.asInstanceOf[V])
}
}

def convert(writable: Writable): Any = {
import collection.JavaConversions._
writable match {
case iw: IntWritable => SparkContext.intWritableConverter().convert(iw)
case dw: DoubleWritable => SparkContext.doubleWritableConverter().convert(dw)
case lw: LongWritable => SparkContext.longWritableConverter().convert(lw)
case fw: FloatWritable => SparkContext.floatWritableConverter().convert(fw)
case t: Text => SparkContext.stringWritableConverter().convert(t)
case bw: BooleanWritable => SparkContext.booleanWritableConverter().convert(bw)
case byw: BytesWritable => SparkContext.bytesWritableConverter().convert(byw)
case n: NullWritable => None
case aw: ArrayWritable => aw.get().map(convert(_))
case mw: MapWritable => mw.map{ case (k, v) => (convert(k), convert(v)) }.toMap
case other => other
}
}

}
11 changes: 7 additions & 4 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,22 +225,25 @@ def sequenceFileAsText(self, name):
jrdd = self._jvm.PythonRDD.sequenceFileAsText(self._jsc, name)
return RDD(jrdd, self, MsgPackDeserializer()) # MsgPackDeserializer PairMUTF8Deserializer

def sequenceFile(self, name, keyClass, valueClass, keyWrapper="", valueWrapper="", minSplits=None):
def sequenceFile(self, name, keyClass="org.apache.hadoop.io.Text", valueClass="org.apache.hadoop.io.Text",
keyWrapper="", valueWrapper="", minSplits=None):
"""
Read a Hadoopp SequenceFile with arbitrary key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI,
and return it as an RDD of (String, String) where the key and value representations
are generated using the 'toString()' method of the relevant Java class.
>>> sc.sequenceFile("/tmp/spark/test/sfint/").collect()
>>> sc.sequenceFile("test_support/data/sfint/").collect()
[(1, 'aa'), (2, 'bb'), (2, 'aa'), (3, 'cc'), (2, 'bb'), (1, 'aa')]
"""
minSplits = minSplits or min(self.defaultParallelism, 2)
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, keyClass, valueClass, keyWrapper, valueWrapper, minSplits)
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, keyClass, valueClass, keyWrapper, valueWrapper,
minSplits)
#jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, keyWrapper, valueWrapper, minSplits)
return RDD(jrdd, self, MsgPackDeserializer()) # MsgPackDeserializer PairMUTF8Deserializer

def newHadoopFile(self, name, inputFormat, keyClass, valueClass, keyWrapper="toString", valueWrapper="toString", conf = {}):
def newHadoopFile(self, name, inputFormat, keyClass, valueClass, keyWrapper="toString", valueWrapper="toString",
conf = {}):
"""
Read a Hadoopp file with arbitrary InputFormat, key and value class from HDFS,
a local file system (available on all nodes), or any Hadoop-supported file system URI,
Expand Down
Binary file added python/test_support/data/sfdouble/._SUCCESS.crc
Binary file not shown.
Binary file added python/test_support/data/sfdouble/.part-00000.crc
Binary file not shown.
Binary file added python/test_support/data/sfdouble/.part-00001.crc
Binary file not shown.
Empty file.
Binary file added python/test_support/data/sfdouble/part-00000
Binary file not shown.
Binary file added python/test_support/data/sfdouble/part-00001
Binary file not shown.
Binary file added python/test_support/data/sfint/._SUCCESS.crc
Binary file not shown.
Binary file added python/test_support/data/sfint/.part-00000.crc
Binary file not shown.
Binary file added python/test_support/data/sfint/.part-00001.crc
Binary file not shown.
Empty file.
Binary file added python/test_support/data/sfint/part-00000
Binary file not shown.
Binary file added python/test_support/data/sfint/part-00001
Binary file not shown.
Binary file added python/test_support/data/sftext/.part-00000.crc
Binary file not shown.
Binary file added python/test_support/data/sftext/.part-00001.crc
Binary file not shown.
Binary file added python/test_support/data/sftext/part-00000
Binary file not shown.
Binary file added python/test_support/data/sftext/part-00001
Binary file not shown.

0 comments on commit 4b0a43f

Please sign in to comment.