Skip to content

Commit

Permalink
Parameterize converter trait
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Jun 6, 2014
1 parent 5ebacfa commit cde6af9
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ import org.apache.spark.annotation.Experimental
* transformation code by overriding the convert method.
*/
@Experimental
trait Converter {
def convert(obj: Any): Any
trait Converter[T, U] {
def convert(obj: T): U
}

/**
* A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects.
* Other objects are passed through without conversion.
*/
private[python] object DefaultConverter extends Converter {
private[python] object DefaultConverter extends Converter[Any, Any] {

/**
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
Expand Down Expand Up @@ -80,8 +80,8 @@ private[python] object DefaultConverter extends Converter {
*/
private[python] class ConverterRegistry extends Logging {

var keyConverter: Converter = DefaultConverter
var valueConverter: Converter = DefaultConverter
var keyConverter: Converter[Any, Any] = DefaultConverter
var valueConverter: Converter[Any, Any] = DefaultConverter

def convertKey(obj: Any): Any = keyConverter.convert(obj)

Expand All @@ -97,9 +97,9 @@ private[python] class ConverterRegistry extends Logging {
logInfo(s"Loaded and registered value converter ($converterClass)")
}

private def register(converterClass: String): Converter = {
private def register(converterClass: String): Converter[Any, Any] = {
Try {
val converter = Class.forName(converterClass).newInstance().asInstanceOf[Converter]
val converter = Class.forName(converterClass).newInstance().asInstanceOf[Converter[Any, Any]]
converter
} match {
case Success(s) => s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ case class TestWritable(var str: String, var int: Int, var double: Double) exten
}
}

class TestConverter extends Converter {
class TestConverter extends Converter[Any, Any] {
import collection.JavaConversions._
override def convert(obj: Any) = {
val m = obj.asInstanceOf[MapWritable]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,23 @@ import org.apache.cassandra.utils.ByteBufferUtil
import collection.JavaConversions.{mapAsJavaMap, mapAsScalaMap}


class CassandraCQLKeyConverter extends Converter {
override def convert(obj: Any) = {
/**
* Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra
* output to a Map[String, Int]
*/
class CassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, Int]] {
override def convert(obj: Any): java.util.Map[String, Int] = {
val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb)))
}
}

class CassandraCQLValueConverter extends Converter {
override def convert(obj: Any) = {
/**
* Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra
* output to a Map[String, String]
*/
class CassandraCQLValueConverter extends Converter[Any, java.util.Map[String, String]] {
override def convert(obj: Any): java.util.Map[String, String] = {
val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ import org.apache.spark.api.python.Converter
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes

class HBaseConverter extends Converter {
override def convert(obj: Any) = {
/**
* Implementation of [[org.apache.spark.api.python.Converter]] that converts a HBase Result
* to a String
*/
class HBaseConverter extends Converter[Any, String] {
override def convert(obj: Any): String = {
val result = obj.asInstanceOf[Result]
Bytes.toStringBinary(result.value())
}
Expand Down

0 comments on commit cde6af9

Please sign in to comment.