Skip to content

Commit

Permalink
Clean up Msgpack serialization and registering
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Apr 18, 2014
1 parent 7237263 commit a67dfad
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import org.apache.hadoop.conf.Configuration
*/
private[python] object PythonHadoopUtil {

def mapToConf(map: java.util.HashMap[String, String]) = {
def mapToConf(map: java.util.Map[String, String]) = {
import collection.JavaConversions._
val conf = new Configuration()
map.foreach{ case (k, v) => conf.set(k, v) }
conf
}

/* Merges two configurations, returns a copy of left with keys from right overwriting any matching keys in left */
/** Merges two configurations, returns a copy of left with keys from right overwriting any matching keys in left */
def mergeConfs(left: Configuration, right: Configuration) = {
import collection.JavaConversions._
val copy = new Configuration(left)
Expand Down
52 changes: 35 additions & 17 deletions core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package org.apache.spark.api.python
import org.msgpack.ScalaMessagePack
import scala.util.Try
import org.apache.spark.rdd.RDD
import java.io.Serializable
import org.apache.spark.{SparkContext, Logging}
import org.apache.hadoop.io._
import scala.util.Success
Expand All @@ -15,23 +14,42 @@ import scala.util.Failure
*/
private[python] object SerDeUtil extends Logging {

/** Attempts to register a class with MsgPack, only if it is not a primitive or a String */
def register[T](clazz: Class[T], msgpack: ScalaMessagePack) {
//implicit val kcm = ClassManifest.fromClass(clazz)
//val kc = kcm.erasure
/**
* Checks whether a Scala object needs to be registered with MsgPack. String, primitives and the standard collections
* don't need to be registered as MsgPack takes care of serializing them and registering them throws scary looking
* errors (but still works).
*/
def needsToBeRegistered[T](t: T) = {
t match {
case d: Double => false
case f: Float => false
case i: Int => false
case l: Long => false
case b: Byte => false
case c: Char => false
case bool: Boolean => false
case s: String => false
case m: Map[_, _] => false
case a: Seq[_] => false
case o: Option[_] => false
case _ => true
}
}

/** Attempts to register a class with MsgPack */
def register[T](t: T, msgpack: ScalaMessagePack) {
if (!needsToBeRegistered(t)) {
return
}
val clazz = t.getClass
Try {
//if (kc.isInstance("") || kc.isPrimitive) {
// log.info("Class: %s doesn't need to be registered".format(kc.getName))
//} else {
msgpack.register(clazz)
log.info("Registered key/value class with MsgPack: %s".format(clazz))
//}

log.info(s"Registered key/value class with MsgPack: $clazz")
} match {
case Failure(err) =>
log.warn("Failed to register class (%s) with MsgPack. ".format(clazz.getName) +
"Falling back to default MsgPack serialization, or 'toString' as last resort. " +
"Error: %s".format(err.getMessage))
log.warn(s"""Failed to register class ($clazz) with MsgPack.
Falling back to default MsgPack serialization, or 'toString' as last resort.
Error: ${err.getMessage}""")
case Success(result) =>
}
}
Expand All @@ -45,8 +63,8 @@ private[python] object SerDeUtil extends Logging {
pairs.map{ pair =>
Try {
if (!triedReg) {
register(pair._1.getClass, mp)
register(pair._2.getClass, mp)
register(pair._1, mp)
register(pair._2, mp)
triedReg = true
}
mp.write(pair)
Expand All @@ -66,7 +84,7 @@ private[python] object SerDeUtil extends Logging {
}

/**
* Converts an RDD of (K, V) pairs, where K and/or V could be instances of [[org.apache.hadoop.io.Writable]],
* Converts an RDD of key-value pairs, where key and/or value could be instances of [[org.apache.hadoop.io.Writable]],
* into an RDD[(K, V)]
*/
def convertRDD[K, V](rdd: RDD[(K, V)]) = {
Expand Down

0 comments on commit a67dfad

Please sign in to comment.