From 2c1851318c0cc6ceb3fdf35204da1971a7f30bc2 Mon Sep 17 00:00:00 2001 From: Nick Pentreath Date: Wed, 4 Jun 2014 15:56:45 +0200 Subject: [PATCH] Add examples for reading HBase and Cassandra InputFormats from Python --- .../src/main/python/cassandra_inputformat.py | 79 +++++++++++++++++++ examples/src/main/python/hbase_inputformat.py | 71 +++++++++++++++++ .../spark/examples/CassandraCQLTest.scala | 17 ++++ .../org/apache/spark/examples/HBaseTest.scala | 11 ++- 4 files changed, 177 insertions(+), 1 deletion(-) create mode 100644 examples/src/main/python/cassandra_inputformat.py create mode 100644 examples/src/main/python/hbase_inputformat.py diff --git a/examples/src/main/python/cassandra_inputformat.py b/examples/src/main/python/cassandra_inputformat.py new file mode 100644 index 0000000000000..61424a7a0477b --- /dev/null +++ b/examples/src/main/python/cassandra_inputformat.py @@ -0,0 +1,79 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys + +from pyspark import SparkContext + +""" +Create data in Cassandra fist +(following: https://wiki.apache.org/cassandra/GettingStarted) + +cqlsh> CREATE KEYSPACE test + ... WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }; +cqlsh> use test; +cqlsh:test> CREATE TABLE users ( + ... user_id int PRIMARY KEY, + ... fname text, + ... lname text + ... ); +cqlsh:test> INSERT INTO users (user_id, fname, lname) + ... VALUES (1745, 'john', 'smith'); +cqlsh:test> INSERT INTO users (user_id, fname, lname) + ... VALUES (1744, 'john', 'doe'); +cqlsh:test> INSERT INTO users (user_id, fname, lname) + ... VALUES (1746, 'john', 'smith'); +cqlsh:test> SELECT * FROM users; + + user_id | fname | lname +---------+-------+------- + 1745 | john | smith + 1744 | john | doe + 1746 | john | smith +""" +if __name__ == "__main__": + if len(sys.argv) != 4: + print >> sys.stderr, """ + Usage: cassandra_inputformat + + Run with example jar: + ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_inputformat.py + Assumes you have some data in Cassandra already, running on , in and + """ + exit(-1) + + host = sys.argv[1] + keyspace = sys.argv[2] + cf = sys.argv[3] + sc = SparkContext(appName="HBaseInputFormat") + + conf = {"cassandra.input.thrift.address":host, + "cassandra.input.thrift.port":"9160", + "cassandra.input.keyspace":keyspace, + "cassandra.input.columnfamily":cf, + "cassandra.input.partitioner.class":"Murmur3Partitioner", + "cassandra.input.page.row.size":"3"} + cass_rdd = sc.newAPIHadoopRDD( + "org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat", + "java.util.Map", + "java.util.Map", + keyConverter="org.apache.spark.examples.CassandraCQLKeyConverter", + valueConverter="org.apache.spark.examples.CassandraCQLValueConverter", + conf=conf) + output = cass_rdd.collect() + for (k, v) in output: + print (k, v) diff --git a/examples/src/main/python/hbase_inputformat.py b/examples/src/main/python/hbase_inputformat.py new file mode 100644 index 0000000000000..a463b25e56bc0 --- /dev/null +++ b/examples/src/main/python/hbase_inputformat.py @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import sys + +from pyspark import SparkContext + +""" +Create test data in HBase first: + +hbase(main):016:0> create 'test', 'f1' +0 row(s) in 1.0430 seconds + +hbase(main):017:0> put 'test', 'row1', 'f1', 'value1' +0 row(s) in 0.0130 seconds + +hbase(main):018:0> put 'test', 'row2', 'f1', 'value2' +0 row(s) in 0.0030 seconds + +hbase(main):019:0> put 'test', 'row3', 'f1', 'value3' +0 row(s) in 0.0050 seconds + +hbase(main):020:0> put 'test', 'row4', 'f1', 'value4' +0 row(s) in 0.0110 seconds + +hbase(main):021:0> scan 'test' +ROW COLUMN+CELL + row1 column=f1:, timestamp=1401883411986, value=value1 + row2 column=f1:, timestamp=1401883415212, value=value2 + row3 column=f1:, timestamp=1401883417858, value=value3 + row4 column=f1:, timestamp=1401883420805, value=value4 +4 row(s) in 0.0240 seconds +""" +if __name__ == "__main__": + if len(sys.argv) != 3: + print >> sys.stderr, """ + Usage: hbase_inputformat + + Run with example jar: + ./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_inputformat.py
+ Assumes you have some data in HBase already, running on , in
+ """ + exit(-1) + + host = sys.argv[1] + table = sys.argv[2] + sc = SparkContext(appName="HBaseInputFormat") + + conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table} + hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat", + "org.apache.hadoop.hbase.io.ImmutableBytesWritable", + "org.apache.hadoop.hbase.client.Result", + valueConverter="org.apache.spark.examples.HBaseConverter", + conf=conf) + output = hbase_rdd.collect() + for (k, v) in output: + print (k, v) diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala index 9a00701f985f0..73555270c83c3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala @@ -32,6 +32,23 @@ import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ +import org.apache.spark.api.python.Converter + +class CassandraCQLKeyConverter extends Converter { + import collection.JavaConversions._ + override def convert(obj: Any) = { + val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]] + mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb))) + } +} + +class CassandraCQLValueConverter extends Converter { + import collection.JavaConversions._ + override def convert(obj: Any) = { + val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]] + mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb))) + } +} /* Need to create following keyspace and column family in cassandra before running this example diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala index a8c338480e6e2..4be364cbd5f3b 100644 --- a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala @@ -17,12 +17,21 @@ package org.apache.spark.examples -import org.apache.hadoop.hbase.client.HBaseAdmin +import org.apache.hadoop.hbase.client.{Result, HBaseAdmin} import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor} import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.spark._ import org.apache.spark.rdd.NewHadoopRDD +import org.apache.spark.api.python.Converter +import org.apache.hadoop.hbase.util.Bytes + +class HBaseConverter extends Converter { + override def convert(obj: Any) = { + val result = obj.asInstanceOf[Result] + Bytes.toStringBinary(result.value()) + } +} object HBaseTest { def main(args: Array[String]) {