Pegasus can send files with SCP:
- Mock producer:
peg scp to-rem kafka-cluster 1 csv_producer.py csv_producer.py
- Test data:
peg scp to-rem kafka-cluster peg scp to-rem kafka-cluster 1 test-tiny test-tiny
Before we can publish to Kafka, the topics must be created. Kafka can also auto-create topics, but that feature is not used here.
SSH into Kafka: peg ssh kafka-cluster 1
Then run the following command:
/usr/local/kafka/bin/kafka-topics.sh \
--create --zookeeper localhost:2181 \
--replication-factor 3 \
--partitions 10 \
--topic price
Note the Kafka path, replication factor, partition number and topic name. You can see the existing topics with: /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
All commands here call the scripts that come with Kafka and have a "localhost" argument - so you have to run them from a shell on your Kafka node, and it is assumed that there's a Zookeper running on the same node. If you followed the Kafka setup instructions, you can do peg ssh kafka-cluster 1
and type these commands there.
/usr/local/kafka/bin/kafka-topics.sh \
--create --zookeeper localhost:2181 \
--replication-factor 1 \
--partitions 1 \
--topic test
(based on official docs)
To list topics: /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181
Interactive console producer: /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Just write as a single command: echo "my test message" | /usr/local/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
To read messages: /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
To delete topic: /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test
You will get a warning about delete.topic.enable
needing to be set. This is fine, it seems to be set by the pegasus install script (if in doubt, check by listing topics after deleting).