Skip to content

Commit

Permalink
Move Converter examples to own package
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Jun 5, 2014
1 parent 365d0be commit a985492
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 34 deletions.
4 changes: 2 additions & 2 deletions examples/src/main/python/cassandra_inputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@
"org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat",
"java.util.Map",
"java.util.Map",
keyConverter="org.apache.spark.examples.CassandraCQLKeyConverter",
valueConverter="org.apache.spark.examples.CassandraCQLValueConverter",
keyConverter="org.apache.spark.examples.pythonconverters.CassandraCQLKeyConverter",
valueConverter="org.apache.spark.examples.pythonconverters.CassandraCQLValueConverter",
conf=conf)
output = cass_rdd.collect()
for (k, v) in output:
Expand Down
11 changes: 6 additions & 5 deletions examples/src/main/python/hbase_inputformat.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@
sc = SparkContext(appName="HBaseInputFormat")

conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
valueConverter="org.apache.spark.examples.HBaseConverter",
conf=conf)
hbase_rdd = sc.newAPIHadoopRDD(
"org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
valueConverter="org.apache.spark.examples.pythonconverters.HBaseConverter",
conf=conf)
output = hbase_rdd.collect()
for (k, v) in output:
print (k, v)
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,7 @@ import org.apache.hadoop.mapreduce.Job

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.api.python.Converter

class CassandraCQLKeyConverter extends Converter {
import collection.JavaConversions._
override def convert(obj: Any) = {
val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb)))
}
}

class CassandraCQLValueConverter extends Converter {
import collection.JavaConversions._
override def convert(obj: Any) = {
val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb)))
}
}

/*
Need to create following keyspace and column family in cassandra before running this example
Expand Down
13 changes: 2 additions & 11 deletions examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,12 @@

package org.apache.spark.examples

import org.apache.hadoop.hbase.client.{Result, HBaseAdmin}
import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.spark.api.python.Converter
import org.apache.hadoop.hbase.util.Bytes

class HBaseConverter extends Converter {
override def convert(obj: Any) = {
val result = obj.asInstanceOf[Result]
Bytes.toStringBinary(result.value())
}
}


object HBaseTest {
def main(args: Array[String]) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.apache.spark.examples.pythonconverters

import org.apache.spark.api.python.Converter
import java.nio.ByteBuffer
import org.apache.cassandra.utils.ByteBufferUtil
import collection.JavaConversions.{mapAsJavaMap, mapAsScalaMap}


class CassandraCQLKeyConverter extends Converter {
override def convert(obj: Any) = {
val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb)))
}
}

class CassandraCQLValueConverter extends Converter {
override def convert(obj: Any) = {
val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb)))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package org.apache.spark.examples.pythonconverters

import org.apache.spark.api.python.Converter
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes

class HBaseConverter extends Converter {
override def convert(obj: Any) = {
val result = obj.asInstanceOf[Result]
Bytes.toStringBinary(result.value())
}
}

0 comments on commit a985492

Please sign in to comment.