Skip to content

Example ingestion using Apache Gearpump

Daniel Smith edited this page Sep 1, 2016 · 13 revisions

Wiki home > Apache Gearpump

Example ingestion

ingestion-ws-kafka-gearpump-hbase is an Apache Gearpump application hosted in the TAP repository. It extends another example, ingestion-ws-kafka-hdfs.

This example shows that existing ingestions can be enhanced easily by dropping in an Apache Gearpump computation DAG that could calculate, aggregate, and filter data, while also allowing for parallel execution and stream forking.

The key component of the pipeline is the Gearpump application (computation DAG), which:

  1. Reads from a Kafka topic
  2. Splits processing into two streams
  3. One stream simply passes the messages to another Kafka topic
  4. The second stream:
  • Processes the message (in this example, the processing is simple string reversal)
  • Persists messages to HBase

See the diagram and description in the project's wiki.

How to deploy and use ingestion-ws-kafka-gearpump-hbase

This example also uses: hbase-java-api-example and twitter-to-kafka applications.

This ingestion demo extends the https://github.com/intel-data/ingestion-ws-kafka-hdfs example. The latter demo assumes that data is acquired through WebSockets and sent to Kafka (https://github.com/intel-data/ingestion-ws-kafka-hdfs/tree/master/ws2kafka). Refer to the project pages to see how to deploy and configure ws2kafka.

The following instructions present a variation of the example that reads Twitter messages and feeds them to Kafka (twitter-to-kafka replaces ws2kafka).

  1. Create a Kafka instance on the platform. In this example, we are using the twitter-to-kafka app, which needs its instance named “kafka-twitter-instance”
    • In the TAP console, navigate to Services > Marketplace and choose “kafka”.
    • Click Create a new instance.
  2. Create an HBase instance on the platform. In this example, we are using the hbase-java-api-example, which needs an instance named “hbase1”
    • In the TAP console, navigate to Services > Marketplace and choose “hbase”.
    • Choose the Shared tab.
    • Click Create a new instance.
  3. Deploy both applications, twitter-to-kafka and hbase-java-api-example, (see the documentation for these components) on TAP.
    • As a KAFKA_TOPIC in the twitter-to-kafka environment variable set, for example “topicIn”
  4. Get the zookeeper uri:
    • In the TAP console, select Services > Instances,
    • Create a key for “hbase1”,
    • Click ”Export Keys” in the upper right corner,
    • Near the key that was just created, click + Add to exports,
    • At the bottom of the screen, find the “exported Keys” section, then your instance, credentials, and the zookeeperUri. Copy this uri:
      • "credentials":{ ... "zookeeperUri": "<<COPY THIS VALUE, example value: cdh-master-0.node.domain.consul:2181,cdh-master-5.node.domain.consul:2181,cdh-master-2.node.domain.consul:2181/kafka>>"}
  5. Now generate the output topic table for the ingestion-ws-kafka-gearpump-hbase example:
    • Go the cdh node, and then to cdh-master-0:
    • Create topics (instead “<<ZOOKEEPER_URI>>” paste uri copied in step 4, including ‘/kafka’ suffix)
      • kafka-topics --create --zookeeper <<ZOOKEEPER_URI>> --replication-factor 1 --partitions 1 --topic topicIn
      • kafka-topics --create --zookeeper <<ZOOKEEPER_URI>> --replication-factor 1 --partitions 1 --topic topicOut
    • To verify that the topic has been created:
      • kafka-topics --list --zookeeper <<ZOOKEEPER_URI>>
  6. Create a table in HBase using the hbase-java-api-example:
    • curl http://domain.and:port/api/tables -X POST -H "Content-Type: application/json" -d '{"tableName":"pipeline","columnFamilies":["message"]}'
    • where domain.and:port is your real domain, for example: curl http://trustedanalytics.org:80/api/tables -X POST -H "Content-Type: application/json" -d '{"tableName":"pipeline","columnFamilies":["message"]}'
  7. Create a GearPump instance or deploy an existing one:
    • In the TAP console, select Data Science > GearPump,
    • (If you do not have an instance, create one now),
    • Click the Deploy application link to the right of the chosen instance name,
    • Choose the Gearpump application jar (from ingestion-ws-kafka-gearpump-hbase/gearpump),
    • Add extra parameters:
      • inputTopic – topicIn
      • outputTopic – topicOut
      • tableName – pipeline
      • columnFamily – message
    • Check the HBase instance “hbase1” and the Kafka instance “kafka-twitter-instance” from the list and deploy the application.
  8. Verify that the information flow is working – tweets should be visible in HBase:
    • curl http://domain.and:port/api/tables/pipeline/head
  9. You can also check the output Kafka topic:
    • Go to cdh and cdh-master-0, then use the command (instead “<<ZOOKEEPER_URI>>” paste uri copied in step 4): kafka-console-consumer --zookeeper <<ZOOKEEPER_URI>> --topic topicin --from-beginning
Clone this wiki locally