Skip to content

Commit

Permalink
Add examples for reading HBase and Cassandra InputFormats from Python
Browse files Browse the repository at this point in the history
  • Loading branch information
MLnick committed Jun 4, 2014
1 parent b65606f commit 2c18513
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 1 deletion.
79 changes: 79 additions & 0 deletions examples/src/main/python/cassandra_inputformat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import sys

from pyspark import SparkContext

"""
Create data in Cassandra fist
(following: https://wiki.apache.org/cassandra/GettingStarted)
cqlsh> CREATE KEYSPACE test
... WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
cqlsh> use test;
cqlsh:test> CREATE TABLE users (
... user_id int PRIMARY KEY,
... fname text,
... lname text
... );
cqlsh:test> INSERT INTO users (user_id, fname, lname)
... VALUES (1745, 'john', 'smith');
cqlsh:test> INSERT INTO users (user_id, fname, lname)
... VALUES (1744, 'john', 'doe');
cqlsh:test> INSERT INTO users (user_id, fname, lname)
... VALUES (1746, 'john', 'smith');
cqlsh:test> SELECT * FROM users;
user_id | fname | lname
---------+-------+-------
1745 | john | smith
1744 | john | doe
1746 | john | smith
"""
if __name__ == "__main__":
if len(sys.argv) != 4:
print >> sys.stderr, """
Usage: cassandra_inputformat <host> <keyspace> <cf>
Run with example jar:
./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/cassandra_inputformat.py <host> <keyspace> <cf>
Assumes you have some data in Cassandra already, running on <host>, in <keyspace> and <cf>
"""
exit(-1)

host = sys.argv[1]
keyspace = sys.argv[2]
cf = sys.argv[3]
sc = SparkContext(appName="HBaseInputFormat")

conf = {"cassandra.input.thrift.address":host,
"cassandra.input.thrift.port":"9160",
"cassandra.input.keyspace":keyspace,
"cassandra.input.columnfamily":cf,
"cassandra.input.partitioner.class":"Murmur3Partitioner",
"cassandra.input.page.row.size":"3"}
cass_rdd = sc.newAPIHadoopRDD(
"org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat",
"java.util.Map",
"java.util.Map",
keyConverter="org.apache.spark.examples.CassandraCQLKeyConverter",
valueConverter="org.apache.spark.examples.CassandraCQLValueConverter",
conf=conf)
output = cass_rdd.collect()
for (k, v) in output:
print (k, v)
71 changes: 71 additions & 0 deletions examples/src/main/python/hbase_inputformat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import sys

from pyspark import SparkContext

"""
Create test data in HBase first:
hbase(main):016:0> create 'test', 'f1'
0 row(s) in 1.0430 seconds
hbase(main):017:0> put 'test', 'row1', 'f1', 'value1'
0 row(s) in 0.0130 seconds
hbase(main):018:0> put 'test', 'row2', 'f1', 'value2'
0 row(s) in 0.0030 seconds
hbase(main):019:0> put 'test', 'row3', 'f1', 'value3'
0 row(s) in 0.0050 seconds
hbase(main):020:0> put 'test', 'row4', 'f1', 'value4'
0 row(s) in 0.0110 seconds
hbase(main):021:0> scan 'test'
ROW COLUMN+CELL
row1 column=f1:, timestamp=1401883411986, value=value1
row2 column=f1:, timestamp=1401883415212, value=value2
row3 column=f1:, timestamp=1401883417858, value=value3
row4 column=f1:, timestamp=1401883420805, value=value4
4 row(s) in 0.0240 seconds
"""
if __name__ == "__main__":
if len(sys.argv) != 3:
print >> sys.stderr, """
Usage: hbase_inputformat <host> <table>
Run with example jar:
./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/hbase_inputformat.py <host> <table>
Assumes you have some data in HBase already, running on <host>, in <table>
"""
exit(-1)

host = sys.argv[1]
table = sys.argv[2]
sc = SparkContext(appName="HBaseInputFormat")

conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
valueConverter="org.apache.spark.examples.HBaseConverter",
conf=conf)
output = hbase_rdd.collect()
for (k, v) in output:
print (k, v)
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,23 @@ import org.apache.hadoop.mapreduce.Job

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.api.python.Converter

class CassandraCQLKeyConverter extends Converter {
import collection.JavaConversions._
override def convert(obj: Any) = {
val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb)))
}
}

class CassandraCQLValueConverter extends Converter {
import collection.JavaConversions._
override def convert(obj: Any) = {
val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]]
mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb)))
}
}

/*
Need to create following keyspace and column family in cassandra before running this example
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,21 @@

package org.apache.spark.examples

import org.apache.hadoop.hbase.client.HBaseAdmin
import org.apache.hadoop.hbase.client.{Result, HBaseAdmin}
import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor}
import org.apache.hadoop.hbase.mapreduce.TableInputFormat

import org.apache.spark._
import org.apache.spark.rdd.NewHadoopRDD
import org.apache.spark.api.python.Converter
import org.apache.hadoop.hbase.util.Bytes

class HBaseConverter extends Converter {
override def convert(obj: Any) = {
val result = obj.asInstanceOf[Result]
Bytes.toStringBinary(result.value())
}
}

object HBaseTest {
def main(args: Array[String]) {
Expand Down

0 comments on commit 2c18513

Please sign in to comment.