Skip to content

Example ingestion using Apache Gearpump

Piotr Grabuszynski edited this page May 17, 2016 · 13 revisions

Wiki home > Apache Gearpump

Example ingestion

ingestion-ws-kafka-gearpump-hbase is an Apache Gearpump application hosted in TAP repository

This approach is taken to emphasize that existing ingestions can be enhanced easily by dropping in Apache Gearpump computation DAG that could calculate, aggregate, filter data but also allow for parallel execution and stream forking.

The key component of the pipeline is GearPump application (computation DAG) that:

  1. Reads from kafka topic
  2. Splits processing into two streams
  3. One stream just passes the messages to another kafka topic
  4. Second stream:
  • Processes the message (in this example the processing is simple string reversal)
  • Persists messages to HBase

See the diagram and more useful information in project's wiki.

How to deploy and use ingestion-ws-kafka-gearpump-hbase

This example will use also: hbase-java-api-example and twitter-to-kafka applications.

  1. Create kafka instance on platform. In this example we will use twitter-to-kafka app, which needs instance named “kafka-twitter-instance”
    • In TAP console go to marketplace and choose “kafka”
    • Create new instance
  2. Create hbase instance on platform. In this example we will use hbase-java-api-example, which needs instance named “hbase1”
    • In TAP console go to marketplace and choose “hbase”
    • Choose tab “Shared”
    • Create new instance
  3. Deploy applications twitter-to-kafka and hbase-java-api-example (see the documentation of this components) on TAP
    • As a KAFKA_TOPIC in twitter-to-kafka environment variable set for example “topicIn”
  4. Get zookeeper uri
    • In TAP console click Services > Instances,
    • Create key for “hbase1”,
    • Click ”Export Keys” in the right top corner,
    • Click “+ Add to exports” near just created key,
    • At the bottom, find “exported Keys” section, then your instance, credentials and zookeeperUri. Copy this uri:
      • "credentials":{ ... "zookeeperUri": "<<COPY THIS VALUE, example value: cdh-master-0.node.domain.consul:2181,cdh-master-5.node.domain.consul:2181,cdh-master-2.node.domain.consul:2181/kafka>>"}
  5. Now generate output topic table for ingestion-ws-kafka-gearpump-hbase example
    • Go the cdh node, and then cdh-master-0:
    • Create topics (instead “<<ZOOKEEPER_URI>>” paste uri copied in step 4, including ‘/kafka’ suffix)
      • kafka-topics --create --zookeeper <<ZOOKEEPER_URI>> --replication-factor 1 --partitions 1 --topic topicIn
      • kafka-topics --create --zookeeper <<ZOOKEEPER_URI>> --replication-factor 1 --partitions 1 --topic topicOut
    • If you want to check, if topic has been created
      • kafka-topics --list --zookeeper <<ZOOKEEPER_URI>>
  6. Create a table in hbase using hbase-java-api-example
    • curl http://domain.and:port/api/tables -X POST -H "Content-Type: application/json" -d '{"tableName":"pipeline","columnFamilies":["message"]}'
    • where domain.and:port is your real domain, for example: curl http://trustedanalytics.org:80/api/tables -X POST -H "Content-Type: application/json" -d '{"tableName":"pipeline","columnFamilies":["message"]}'
  7. Create GearPump instance or deploy on existing one
    • In TAP console, go to Data Science > GearPump tab,
    • If you don’t have an instance, you can create it right now,
    • Click link “Deploy App” on the right to chosen instance name,
    • Choose gearpump application jar (from ingestion-ws-kafka-gearpump-hbase/gearpump),
    • Add extra parameters:
      • inputTopic – topicIn
      • outputTopic – topicOut
      • tableName – pipeline
      • columnFamily – message
    • Check hbase instance called “hbase1” and kafka instance “kafka-twitter-instance” from the list and deploy the application.
  8. Check if the information flow is working – tweets should be visible in hbase:
    • curl http://domain.and:port/api/tables/pipeline/head
  9. You can also check output kafka topic:
    • Go to cdh and cdh-master-0, then use command (instead “<<ZOOKEEPER_URI>>” paste uri copied in step 4): kafka-console-consumer --zookeeper <<ZOOKEEPER_URI>> --topic topicin --from-beginning
Clone this wiki locally