diff --git a/examples/src/main/python/cassandra_inputformat.py b/examples/src/main/python/cassandra_inputformat.py index 270c30b096105..39fa6b0d22ef5 100644 --- a/examples/src/main/python/cassandra_inputformat.py +++ b/examples/src/main/python/cassandra_inputformat.py @@ -71,8 +71,8 @@ "org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat", "java.util.Map", "java.util.Map", - keyConverter="org.apache.spark.examples.CassandraCQLKeyConverter", - valueConverter="org.apache.spark.examples.CassandraCQLValueConverter", + keyConverter="org.apache.spark.examples.pythonconverters.CassandraCQLKeyConverter", + valueConverter="org.apache.spark.examples.pythonconverters.CassandraCQLValueConverter", conf=conf) output = cass_rdd.collect() for (k, v) in output: diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py index a463b25e56bc0..3289d9880a0f5 100644 --- a/examples/src/main/python/hbase_inputformat.py +++ b/examples/src/main/python/hbase_inputformat.py @@ -61,11 +61,12 @@ sc = SparkContext(appName="HBaseInputFormat") conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table} - hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat", - "org.apache.hadoop.hbase.io.ImmutableBytesWritable", - "org.apache.hadoop.hbase.client.Result", - valueConverter="org.apache.spark.examples.HBaseConverter", - conf=conf) + hbase_rdd = sc.newAPIHadoopRDD( + "org.apache.hadoop.hbase.mapreduce.TableInputFormat", + "org.apache.hadoop.hbase.io.ImmutableBytesWritable", + "org.apache.hadoop.hbase.client.Result", + valueConverter="org.apache.spark.examples.pythonconverters.HBaseConverter", + conf=conf) output = hbase_rdd.collect() for (k, v) in output: print (k, v) diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala index 73555270c83c3..71f53af68f4d3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala @@ -32,23 +32,7 @@ import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ -import org.apache.spark.api.python.Converter -class CassandraCQLKeyConverter extends Converter { - import collection.JavaConversions._ - override def convert(obj: Any) = { - val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]] - mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb))) - } -} - -class CassandraCQLValueConverter extends Converter { - import collection.JavaConversions._ - override def convert(obj: Any) = { - val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]] - mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb))) - } -} /* Need to create following keyspace and column family in cassandra before running this example diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala index 4be364cbd5f3b..4893b017ed819 100644 --- a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala @@ -17,21 +17,12 @@ package org.apache.spark.examples -import org.apache.hadoop.hbase.client.{Result, HBaseAdmin} +import org.apache.hadoop.hbase.client.HBaseAdmin import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark._ -import org.apache.spark.rdd.NewHadoopRDD -import org.apache.spark.api.python.Converter -import org.apache.hadoop.hbase.util.Bytes - -class HBaseConverter extends Converter { - override def convert(obj: Any) = { - val result = obj.asInstanceOf[Result] - Bytes.toStringBinary(result.value()) - } -} + object HBaseTest { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala new file mode 100644 index 0000000000000..0275c695c0cfa --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala @@ -0,0 +1,21 @@ +package org.apache.spark.examples.pythonconverters + +import org.apache.spark.api.python.Converter +import java.nio.ByteBuffer +import org.apache.cassandra.utils.ByteBufferUtil +import collection.JavaConversions.{mapAsJavaMap, mapAsScalaMap} + + +class CassandraCQLKeyConverter extends Converter { + override def convert(obj: Any) = { + val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]] + mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb))) + } +} + +class CassandraCQLValueConverter extends Converter { + override def convert(obj: Any) = { + val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]] + mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb))) + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala new file mode 100644 index 0000000000000..02c724c5c718c --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverter.scala @@ -0,0 +1,12 @@ +package org.apache.spark.examples.pythonconverters + +import org.apache.spark.api.python.Converter +import org.apache.hadoop.hbase.client.Result +import org.apache.hadoop.hbase.util.Bytes + +class HBaseConverter extends Converter { + override def convert(obj: Any) = { + val result = obj.asInstanceOf[Result] + Bytes.toStringBinary(result.value()) + } +}