Kai Waehner <[email protected]> 22 Jul 2019
This script assumes that all components (Zookeeper, Kafka, Connect, MQTT Broker) use default values.
Please note that Confluent CLI changed with Confluent Platform 5.3+: 'confluent local start' Confluent Platform 5.2 and earlier: 'confluent start'. This guide is tested with Confluent Platform 5.4.0.
Start the MQTT Broker and test publish / subscribe with 'dummy' topic:
brew services start mosquitto
mosquitto_sub -h 127.0.0.1 -t dummy
mosquitto_pub -h 127.0.0.1 -t dummy -m "Hello world"
Make sure to have Confluent folder on PATH. Otherwise, go to $CONFLUENT_INSTALL/bin
to execute commands these commands.
Start Kafka Connect and dependencies (Kafka, Zookeeper, Schema Registry):
confluent local start connect
We use Kafka Connect in distributed mode via REST call (only possible when Connect is running). This is highly recommend as you can do this for one single node (like in this example, but also deploy a distributed Connect cluster).
Important: If you copy&paste the below curl command, make sure that you copy it "plaintext only". Otherwise, it might not work. On my Mac, I copy the command to a text editor first and from there to the command line to execute it.
curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{ "name" : "mqtt-source", "config" : { "connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector", "tasks.max" : "1", "mqtt.server.uri" : "tcp://127.0.0.1:1883", "mqtt.topics" : "temperature", "kafka.topic" : "mqtt.temperature", "confluent.topic.bootstrap.servers": "localhost:9092", "confluent.topic.replication.factor": "1", "confluent.license":"" } }'
Please note that "distributed mode" can also be used for one single Connect instance like when using Confluent CLI. You could also use standalone mode (see more details at the end of the file).
Check if connector is loaded and status is 'RUNNING':
// REST
curl -s "http://localhost:8083/connectors"
curl -s "http://localhost:8083/connectors/mqtt-source/status"
curl -s -X DELETE localhost:8083/connectors/mqtt-source
// Confluent CLI
confluent local status connectors
confluent local status mqtt-source
Next, create a Kafka Topic for consuming the MQTT messages via the MQTT Connector (needs to be the same as in the Connector Config):
kafka-topics --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic mqtt.temperature
Use a CLI tool such as mosquitto_sub
from the shell prompt to consume from MQTT topic (to ensure the MQTT infrastructure works):
$ mosquitto_sub -h 127.0.0.1 -t temperature
99999,4.193955593608823
99999,5.363750640274894
99999,7.292092517069437
[...]
Use a CLI tool such as kafka-console-consumer
from the shell prompt to consume from Kafka topic:
$ kafka-console-consumer --bootstrap-server localhost:9092 --topic mqtt.temperature --property print.key=true --from-beginning
99999,4.193955593608823
99999,5.363750640274894
99999,7.292092517069437
[...]
Send a MQTT message to the MQTT Broker, using mosquitto_pub
:
mosquitto_pub -h 127.0.0.1 -p 1883 -t temperature -q 2 -m "99999,2.10#"
// Send fraud messages. These are routed to a different Kafka Topic if you used the Connector config with the SMT above.
mosquitto_pub -h 127.0.0.1 -p 1883 -t temperature -q 2 -m "fraud"
mosquitto_pub -h 127.0.0.1 -p 1883 -t temperature -q 2 -m "99999,2.10# 2.13# 2.19# 2.28# 2.44# 2.62# 2.80# 3.04# 3.36# 3.69# 3.97# 4.24# 4.53#4.80# 5.02# 5.21# 5.40# 5.57# 5.71# 5.79# 5.86# 5.92# 5.98# 6.02# 6.06# 6.08# 6.14# 6.18# 6.22# 6.27#6.32# 6.35# 6.38# 6.45# 6.49# 6.53# 6.57# 6.64# 6.70# 6.73# 6.78# 6.83# 6.88# 6.92# 6.94# 6.98# 7.01#7.03# 7.05# 7.06# 7.07# 7.08# 7.06# 7.04# 7.03# 6.99# 6.94# 6.88# 6.83# 6.77# 6.69# 6.60# 6.53# 6.45#6.36# 6.27# 6.19# 6.11# 6.03# 5.94# 5.88# 5.81# 5.75# 5.68# 5.62# 5.61# 5.54# 5.49# 5.45# 5.42# 5.38#5.34# 5.31# 5.30# 5.29# 5.26# 5.23# 5.23# 5.22# 5.20# 5.19# 5.18# 5.19# 5.17# 5.15# 5.14# 5.17# 5.16#5.15# 5.15# 5.15# 5.14# 5.14# 5.14# 5.15# 5.14# 5.14# 5.13# 5.15# 5.15# 5.15# 5.14# 5.16# 5.15# 5.15#5.14# 5.14# 5.15# 5.15# 5.14# 5.13# 5.14# 5.14# 5.11# 5.12# 5.12# 5.12# 5.09# 5.09# 5.09# 5.10# 5.08# 5.08# 5.08# 5.08# 5.06# 5.05# 5.06# 5.07# 5.05# 5.03# 5.03# 5.04# 5.03# 5.01# 5.01# 5.02# 5.01# 5.01#5.00# 5.00# 5.02# 5.01# 4.98# 5.00# 5.00# 5.00# 4.99# 5.00# 5.01# 5.02# 5.01# 5.03# 5.03# 5.02# 5.02#5.04# 5.04# 5.04# 5.02# 5.02# 5.01# 4.99# 4.98# 4.96# 4.96# 4.96# 4.94# 4.93# 4.93# 4.93# 4.93# 4.93# 5.02# 5.27# 5.80# 5.94# 5.58# 5.39# 5.32# 5.25# 5.21# 5.13# 4.97# 4.71# 4.39# 4.05# 3.69# 3.32# 3.05#2.99# 2.74# 2.61# 2.47# 2.35# 2.26# 2.20# 2.15# 2.10# 2.08"
Now run a script to generate and publish a continuous stream of MQTT messages:
./sensor_generator.sh
Stop the sensor_generator script with Control-C.
Stop the Kafka and MQTT consumers with Control-C.
Finally, stop the backend services:
brew services stop mosquitto
confluent local destroy
Here are some helpful commands if you have problems starting mosquitto or finding out if it is running and on which port:
// If brew does not work (or you are not on Mac):
// Start Mosquitto
/usr/local/sbin/mosquitto
// Start Moquitto (being in PATH) with a specific config file
mosquitto -c /usr/local/etc/mosquitto/mosquitto.conf
// Mosquitto Broker running? => Find process:
ps -ef | grep mosquitto
// Which port?
lsof -n -P -i4|grep LISTEN
Logging is a worker-level configuration, so there’s no way to set it per connector instance.
To adjust the log levels you modify the connect-log4j.properties
file and restart your worker.
Default log level is ERROR. You can change to INFO or DEBUG.
vi /Users/kai.waehner/confluent-5.1.0/etc/kafka/connect-log4j.properties
// => Change log level
// Restart Kafka Connect worker
confluent local stop connect
confluent local start connect
// Check the Connect log:
confluent local log kafka
As alternative to a HTTP call, you could also configure a property file for standalone mode. However, as REST can also be used for one single instance (in distributed mode), I always use this option to configure Kafka Connect. But here is some more information about using a property file instead of HTTP to configure Connect (in standalone mode).
Configure MQTT Connector properties file (for Kafka Connect standalone mode) /Users/kai.waehner/confluent-5.0.0/share/confluent-hub-components/confluentinc-kafka-connect-mqtt/etc/source-anonymous.properties
with your values for
-
MQTT Broker URL
-
MQTT Topic(s) to consume from (comma-separated list)
-
Kafka Topic Mapping (prefix)
For example, if you want to consume the MQTT topics temperature
and humidity
, and you want the Kafka topics to be mqtt.temperature
and mqtt.humidity
, then do
"mqtt.topics" : "temperature,humidity",
"kafka.topics" : "mqtt.temperature"
This needs to be done before you start Kafka Connect. Example:
name=mqtt-source
tasks.max=1
connector.class=io.confluent.connect.mqtt.MqttSourceConnector
mqtt.server.uri=tcp://127.0.0.1:32790
mqtt.topics=temperature
kafka.topics=mqtt.temperature