From 2ee2913f173199c18af4e48093b054c8fd286626 Mon Sep 17 00:00:00 2001 From: "Christian W. Zuckschwerdt" Date: Wed, 3 Apr 2019 10:24:33 +0000 Subject: [PATCH] Add MQTT output --- README.md | 14 +- conf/rtl_433.example.conf | 15 +- examples/rtl_433_mqtt_hass.py | 318 ++++++++++++++++++++++ include/output_mqtt.h | 19 ++ src/CMakeLists.txt | 1 + src/Makefile.am | 1 + src/output_mqtt.c | 490 ++++++++++++++++++++++++++++++++++ src/rtl_433.c | 26 +- vs15/rtl_433.vcxproj | 2 + vs15/rtl_433.vcxproj.filters | 6 + 10 files changed, 884 insertions(+), 8 deletions(-) create mode 100755 examples/rtl_433_mqtt_hass.py create mode 100644 include/output_mqtt.h create mode 100644 src/output_mqtt.c diff --git a/README.md b/README.md index 1b0895228..3ff2d2ead 100644 --- a/README.md +++ b/README.md @@ -63,7 +63,7 @@ Usage: = General options = [-w | help] Save data stream to output file (a '-' dumps samples to stdout) [-W | help] Save data stream to output file, overwrite existing file = Data output options = - [-F kv | json | csv | syslog | null | help] Produce decoded output in given format. + [-F kv | json | csv | mqtt | syslog | null | help] Produce decoded output in given format. Append output to file with : (e.g. -F csv:log.csv), defaults to stdout. Specify host/port for syslog with e.g. -F syslog:127.0.0.1:1514 [-M time | reltime | notime | hires | utc | protocol | level | stats | bits | help] Add various meta data to each output. @@ -206,7 +206,7 @@ Option -d: [-d "" Open default SoapySDR device [-d driver=rtlsdr Open e.g. specific SoapySDR device To set gain for SoapySDR use -g ELEM=val,ELEM=val,... e.g. -g LNA=20,TIA=8,PGA=2 (for LimeSDR). -[-d rtl_tcp[:host[:port]] (default: localhost:1234) +[-d rtl_tcp[:[//]host[:port]] (default: localhost:1234) Specify host/port to connect to with e.g. -d rtl_tcp:127.0.0.1:1234 Option -g: @@ -269,9 +269,17 @@ E.g. -X "n=doorbell,m=OOK_PWM,s=400,l=800,r=7000,g=1000,match={24}0xa9878c,repea Option -F: -[-F kv|json|csv|syslog|null] Produce decoded output in given format. +[-F kv|json|csv|mqtt|syslog|null] Produce decoded output in given format. Without this option the default is KV output. Use "-F null" to remove the default. Append output to file with : (e.g. -F csv:log.csv), defaults to stdout. + Specify MQTT server with e.g. -F mqtt://localhost:1883 + Add MQTT options with e.g. -F "mqtt://host:1883,opt=arg" + MQTT options are: user=foo, pass=bar, retain[=0|1], + usechannel=replaceid|afterid|beforeid|no, [=topic] + Supported MQTT formats: (default is all) + events: posts JSON event data + states: posts JSON state data + devices: posts device and sensor info in nested topics Specify host/port for syslog with e.g. -F syslog:127.0.0.1:1514 Option -M: diff --git a/conf/rtl_433.example.conf b/conf/rtl_433.example.conf index c7a0b9e6f..ddeeee6d5 100644 --- a/conf/rtl_433.example.conf +++ b/conf/rtl_433.example.conf @@ -159,9 +159,18 @@ signal_grabber none ## Data output options # as command line option: -# [-F] kv|json|csv|syslog Produce decoded output in given format. Not yet supported by all drivers. -# append output to file with : (e.g. -F csv:log.csv), defaults to stdout. -# specify host/port for syslog with e.g. -F syslog:127.0.0.1:1514 +# [-F kv|json|csv|mqtt|syslog|null] Produce decoded output in given format. +# Without this option the default is KV output. Use "-F null" to remove the default. +# Append output to file with : (e.g. -F csv:log.csv), defaults to stdout. +# Specify MQTT server with e.g. -F mqtt://localhost:1883 +# Add MQTT options with e.g. -F "mqtt://host:1883,opt=arg" +# MQTT options are: user=foo, pass=bar, retain[=0|1], +# usechannel=replaceid|afterid|beforeid|no, [=topic] +# Supported MQTT formats: (default is all) +# events: posts JSON event data +# states: posts JSON state data +# devices: posts device and sensor info in nested topics +# Specify host/port for syslog with e.g. -F syslog:127.0.0.1:1514 # default is "kv", multiple outputs can be used. output json diff --git a/examples/rtl_433_mqtt_hass.py b/examples/rtl_433_mqtt_hass.py new file mode 100755 index 000000000..ded675f90 --- /dev/null +++ b/examples/rtl_433_mqtt_hass.py @@ -0,0 +1,318 @@ +#!/usr/bin/env python +# coding=utf-8 + +"""MQTT Home Assistant auto discovery for rtl_433 events.""" + +# It is strongly recommended to run rtl_433 with "-C si" and "-M newmodel". + +# Needs Paho-MQTT https://pypi.python.org/pypi/paho-mqtt + +# Option: PEP 3143 - Standard daemon process library +# (use Python 3.x or pip install python-daemon) +# import daemon + +from __future__ import print_function +from __future__ import with_statement + +import socket +import time +import json +import paho.mqtt.client as mqtt + +MQTT_HOST = "127.0.0.1" +MQTT_PORT = 1883 +MQTT_TOPIC = "rtl_433/+/events" +DISCOVERY_PREFIX = "homeassistant" +DISCOVERY_INTERVAL = 600 # Seconds before refreshing the discovery + +discovery_timeouts = {} + +mappings = { + "temperature_C": { + "device_type": "sensor", + "object_suffix": "T", + "config": { + "device_class": "temperature", + "name": "Temperature", + "unit_of_measurement": "°C", + "value_template": "{{ value_json.temperature_C }}" + } + }, + "temperature_1_C": { + "device_type": "sensor", + "object_suffix": "T1", + "config": { + "device_class": "temperature", + "name": "Temperature 1", + "unit_of_measurement": "°C", + "value_template": "{{ value_json.temperature_1_C }}" + } + }, + "temperature_2_C": { + "device_type": "sensor", + "object_suffix": "T2", + "config": { + "device_class": "temperature", + "name": "Temperature 2", + "unit_of_measurement": "°C", + "value_template": "{{ value_json.temperature_2_C }}" + } + }, + "temperature_F": { + "device_type": "sensor", + "object_suffix": "F", + "config": { + "device_class": "temperature", + "name": "Temperature", + "unit_of_measurement": "°F", + "value_template": "{{ value_json.temperature_F }}" + } + }, + + "battery_ok": { + "device_type": "sensor", + "object_suffix": "B", + "config": { + "device_class": "battery", + "name": "Battery", + "unit_of_measurement": "%", + "value_template": "{{ float(value_json.battery_ok) * 99 + 1 }}" + } + }, + + "humidity": { + "device_type": "sensor", + "object_suffix": "H", + "config": { + "device_class": "humidity", + "name": "Humidity", + "unit_of_measurement": "%", + "value_template": "{{ value_json.humidity }}" + } + }, + + "moisture": { + "device_type": "sensor", + "object_suffix": "H", + "config": { + "device_class": "moisture", + "name": "Moisture", + "unit_of_measurement": "%", + "value_template": "{{ value_json.moisture }}" + } + }, + + "pressure_hPa": { + "device_type": "sensor", + "object_suffix": "P", + "config": { + "device_class": "pressure", + "name": "Pressure", + "unit_of_measurement": "hPa", + "value_template": "{{ value_json.pressure_hPa }}" + } + }, + + "wind_speed_km_h": { + "device_type": "sensor", + "object_suffix": "WS", + "config": { + "device_class": "weather", + "name": "Wind Speed", + "unit_of_measurement": "km/h", + "value_template": "{{ value_json.wind_speed_km_h }}" + } + }, + + "wind_speed_m_s": { + "device_type": "sensor", + "object_suffix": "WS", + "config": { + "device_class": "weather", + "name": "Wind Speed", + "unit_of_measurement": "km/h", + "value_template": "{{ float(value_json.wind_speed_m_s) * 3.6 }}" + } + }, + + "gust_speed_km_h": { + "device_type": "sensor", + "object_suffix": "GS", + "config": { + "device_class": "weather", + "name": "Gust Speed", + "unit_of_measurement": "km/h", + "value_template": "{{ value_json.gust_speed_km_h }}" + } + }, + + "gust_speed_m_s": { + "device_type": "sensor", + "object_suffix": "GS", + "config": { + "device_class": "weather", + "name": "Gust Speed", + "unit_of_measurement": "km/h", + "value_template": "{{ float(value_json.gust_speed_m_s) * 3.6 }}" + } + }, + + "wind_dir_deg": { + "device_type": "sensor", + "object_suffix": "WD", + "config": { + "device_class": "weather", + "name": "Wind Direction", + "unit_of_measurement": "°", + "value_template": "{{ value_json.wind_dir_deg }}" + } + }, + + "rain_mm": { + "device_type": "sensor", + "object_suffix": "RT", + "config": { + "device_class": "weather", + "name": "Rain Total", + "unit_of_measurement": "mm", + "value_template": "{{ value_json.rain_mm }}" + } + }, + + "rain_mm_h": { + "device_type": "sensor", + "object_suffix": "RR", + "config": { + "device_class": "weather", + "name": "Rain Rate", + "unit_of_measurement": "mm/h", + "value_template": "{{ value_json.rain_mm_h }}" + } + }, + + # motion... + + # switches... + + "depth_cm": { + "device_type": "sensor", + "object_suffix": "D", + "config": { + "device_class": "depth", + "name": "Depth", + "unit_of_measurement": "cm", + "value_template": "{{ value_json.depth_cm }}" + } + }, +} + + +def mqtt_connect(client, userdata, flags, rc): + """Callback for MQTT connects.""" + print("MQTT connected: " + mqtt.connack_string(rc)) + if rc != 0: + print("Could not connect. Error: " + str(rc)) + else: + client.subscribe(MQTT_TOPIC) + + +def mqtt_disconnect(client, userdata, rc): + """Callback for MQTT disconnects.""" + print("MQTT disconnected: " + mqtt.connack_string(rc)) + + +def mqtt_message(client, userdata, msg): + """Callback for MQTT message PUBLISH.""" + try: + # Decode JSON payload + data = json.loads(msg.payload.decode()) + bridge_event_to_hass(client, msg.topic, data) + + except json.decoder.JSONDecodeError: + print("JSON decode error: " + msg.payload.decode()) + return + + +def sanitize(text): + """Sanitize a name for Graphite/MQTT use.""" + return (text + .replace(" ", "_") + .replace("/", "_") + .replace(".", "_") + .replace("&", "")) + + +def publish_config(mqttc, topic, model, instance, mapping): + """Publish Home Assistant auto discovery data.""" + global discovery_timeouts + + device_type = mapping["device_type"] + object_suffix = mapping["object_suffix"] + object_id = "-".join([model, instance, object_suffix]) + + path = "/".join([DISCOVERY_PREFIX, device_type, object_id, "config"]) + + # check timeout + now = time.time() + if path in discovery_timeouts: + if discovery_timeouts[path] > now: + return + + discovery_timeouts[path] = now + DISCOVERY_INTERVAL + + config = mapping["config"].copy() + config["state_topic"] = topic + + mqttc.publish(path, json.dumps(config)) + print(path, " : ", json.dumps(config)) + + +def bridge_event_to_hass(mqttc, topic, data): + """Translate some rtl_433 sensor data to Home Assistant auto discovery.""" + + if "model" not in data: + # not a device event + return + model = sanitize(data["model"]) + + if "channel" in data: + channel = str(data["channel"]) + instance = channel + elif "id" in data: + device_id = str(data["id"]) + instance = device_id + if not instance: + # no unique device identifier + return + + # detect known attributes + for key in data.keys(): + if key in mappings: + publish_config(mqttc, topic, model, instance, mappings[key]) + + +def rtl_433_bridge(): + """Run a MQTT Home Assistant auto discovery bridge for rtl_433.""" + mqttc = mqtt.Client() + mqttc.on_connect = mqtt_connect + mqttc.on_disconnect = mqtt_disconnect + mqttc.on_message = mqtt_message + mqttc.connect_async(MQTT_HOST, MQTT_PORT, 60) + mqttc.loop_start() + + while True: + time.sleep(1) + + +def run(): + """Run main or daemon.""" + # with daemon.DaemonContext(files_preserve=[sock]): + # detach_process=True + # uid + # gid + # working_directory + rtl_433_bridge() + + +if __name__ == "__main__": + run() diff --git a/include/output_mqtt.h b/include/output_mqtt.h new file mode 100644 index 000000000..fc9d2997e --- /dev/null +++ b/include/output_mqtt.h @@ -0,0 +1,19 @@ +/** @file + MQTT output for rtl_433 events + + Copyright (C) 2019 Christian Zuckschwerdt + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. +*/ + +#ifndef INCLUDE_OUTPUT_MQTT_H_ +#define INCLUDE_OUTPUT_MQTT_H_ + +#include "data.h" + +struct data_output *data_output_mqtt_create(char const *host, char const *port, char *opts, char const *dev_hint); + +#endif /* INCLUDE_OUTPUT_MQTT_H_ */ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 0824cac1c..cc0655ba3 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -15,6 +15,7 @@ add_executable(rtl_433 list.c mongoose.c optparse.c + output_mqtt.c pulse_demod.c pulse_detect.c r_util.c diff --git a/src/Makefile.am b/src/Makefile.am index 41af64de5..52490931f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -16,6 +16,7 @@ rtl_433_SOURCES = abuf.c \ list.c \ mongoose.c \ optparse.c \ + output_mqtt.c \ pulse_demod.c \ pulse_detect.c \ r_util.c \ diff --git a/src/output_mqtt.c b/src/output_mqtt.c new file mode 100644 index 000000000..2b46ef335 --- /dev/null +++ b/src/output_mqtt.c @@ -0,0 +1,490 @@ +/** @file + MQTT output for rtl_433 events + + Copyright (C) 2019 Christian Zuckschwerdt + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. +*/ + +// note: our unit header includes unistd.h for gethostname() via data.h +#include "output_mqtt.h" +#include "optparse.h" +#include "util.h" + +#include +#include +#include + +#include "mongoose.h" + +/* MQTT client abstraction */ + +typedef struct mqtt_client { + struct mg_send_mqtt_handshake_opts opts; + int prev_status; + char address[253 + 6 + 1]; // dns max + port + char client_id[256]; + uint16_t message_id; + int publish_flags; // MG_MQTT_RETAIN | MG_MQTT_QOS(0) +} mqtt_client_t; + +static void mqtt_client_event(struct mg_connection *nc, int ev, void *ev_data) +{ + // note that while shutting down the ctx is NULL + mqtt_client_t *ctx = (mqtt_client_t *)nc->mgr->user_data; + // only valid in MG_EV_MQTT_ events + struct mg_mqtt_message *msg = (struct mg_mqtt_message *)ev_data; + + //if (ev != MG_EV_POLL) + // fprintf(stderr, "MQTT user handler got event %d\n", ev); + + switch (ev) { + case MG_EV_CONNECT: { + int connect_status = *(int *)ev_data; + if (connect_status == 0) { + // Success + fprintf(stderr, "MQTT Connected...\n"); + mg_set_protocol_mqtt(nc); + if (ctx) + mg_send_mqtt_handshake_opt(nc, ctx->client_id, ctx->opts); + } + else { + // Error, print only once + if (ctx && ctx->prev_status != connect_status) + fprintf(stderr, "MQTT connect error: %s\n", strerror(connect_status)); + } + if (ctx) + ctx->prev_status = connect_status; + break; + } + case MG_EV_MQTT_CONNACK: + if (msg->connack_ret_code != MG_EV_MQTT_CONNACK_ACCEPTED) { + fprintf(stderr, "MQTT Connection error: %d\n", msg->connack_ret_code); + } + else { + fprintf(stderr, "MQTT Connection established.\n"); + } + break; + case MG_EV_MQTT_PUBACK: + fprintf(stderr, "MQTT Message publishing acknowledged (msg_id: %d)\n", msg->message_id); + break; + case MG_EV_MQTT_SUBACK: + fprintf(stderr, "MQTT Subscription acknowledged.\n"); + break; + case MG_EV_MQTT_PUBLISH: { + fprintf(stderr, "MQTT Incoming message %.*s: %.*s\n", (int)msg->topic.len, + msg->topic.p, (int)msg->payload.len, msg->payload.p); + break; + } + case MG_EV_CLOSE: + if (!ctx) + break; // shuttig down + if (ctx->prev_status == 0) + fprintf(stderr, "MQTT Connection failed...\n"); + // reconnect + if (mg_connect(nc->mgr, ctx->address, mqtt_client_event) == NULL) { + fprintf(stderr, "MQTT connect(%s) failed\n", ctx->address); + } + break; + } +} + +static struct mg_mgr *mqtt_client_init(char const *host, char const *port, char const *user, char const *pass, char const *client_id, int retain) +{ + struct mg_mgr *mgr = calloc(1, sizeof(*mgr)); + if (!mgr) { + fprintf(stderr, "calloc() failed in %s() %s:%d\n", __func__, __FILE__, __LINE__); + exit(1); + } + + mqtt_client_t *ctx = calloc(1, sizeof(*ctx)); + if (!ctx) { + fprintf(stderr, "calloc() failed in %s() %s:%d\n", __func__, __FILE__, __LINE__); + exit(1); + } + ctx->opts.user_name = user; + ctx->opts.password = pass; + ctx->publish_flags = MG_MQTT_QOS(0) | (retain ? MG_MQTT_RETAIN : 0); + // TODO: these should be user configurable options + //ctx->opts.keepalive = 60; + //ctx->timeout = 10000L; + //ctx->cleansession = 1; + strncpy(ctx->client_id, client_id, sizeof(ctx->client_id)); + + mg_mgr_init(mgr, ctx); + + // if the host is an IPv6 address it needs quoting + if (strchr(host, ':')) + snprintf(ctx->address, sizeof(ctx->address), "[%s]:%s", host, port); + else + snprintf(ctx->address, sizeof(ctx->address), "%s:%s", host, port); + + if (mg_connect(mgr, ctx->address, mqtt_client_event) == NULL) { + fprintf(stderr, "MQTT connect(%s) failed\n", ctx->address); + exit(1); + } + + return mgr; +} + +static int mqtt_client_poll(struct mg_mgr *mgr) +{ + return mg_mgr_poll(mgr, 0); +} + +static void mqtt_client_publish(struct mg_mgr *mgr, char const *topic, char const *str) +{ + mqtt_client_t *ctx = (mqtt_client_t *)mgr->user_data; + ctx->message_id++; + + for (struct mg_connection *c = mg_next(mgr, NULL); c != NULL; c = mg_next(mgr, c)) { + if (c->proto_handler) + mg_mqtt_publish(c, topic, ctx->message_id, ctx->publish_flags, str, strlen(str)); + } +} + +static void mqtt_client_free(struct mg_mgr *mgr) +{ + free(mgr->user_data); + mgr->user_data = NULL; + mg_mgr_free(mgr); +} + +/* Helper */ + +/// clean the topic inplace to [-.A-Za-z0-9], esp. not whitespace, +, #, /, $ +static char *mqtt_sanitize_topic(char *topic) +{ + for (char *p = topic; *p; ++p) + if (*p != '-' && *p != '.' && (*p < 'A' || *p > 'Z') && (*p < 'a' || *p > 'z') && (*p < '0' || *p > '9')) + *p = '_'; + + return topic; +} + +/* MQTT printer */ + +typedef enum { + USE_CHANNEL_NO = 0, + USE_CHANNEL_REPLACE_ID = 1, + USE_CHANNEL_AFTER_ID, + USE_CHANNEL_BEFORE_ID, +} use_channel_t; + +typedef struct { + struct data_output output; + struct mg_mgr *mgr; + char topic[256]; + char *devices; + char *events; + char *states; + use_channel_t use_channel; + //char *homie; + //char *hass; +} data_output_mqtt_t; + +static void print_mqtt_array(data_output_t *output, data_array_t *array, char *format) +{ + data_output_mqtt_t *mqtt = (data_output_mqtt_t *)output; + + char *orig = mqtt->topic + strlen(mqtt->topic); // save current topic + + for (int c = 0; c < array->num_values; ++c) { + sprintf(orig, "/%d", c); + print_array_value(output, array, format, c); + } + *orig = '\0'; // restore topic +} + +static char *append_topic(char *topic, data_t *data) +{ + if (data->type == DATA_STRING) { + *topic++ = '/'; + strcpy(topic, data->value); + mqtt_sanitize_topic(topic); + topic += strlen(data->value); + } + else if (data->type == DATA_INT) { + *topic++ = '/'; + topic += sprintf(topic, "%d", *(int *)data->value); + } + else { + fprintf(stderr, "Can't append data type %d to topic\n", data->type); + } + + return topic; +} + +// /[/][/][/][|/]battery: "OK"|"LOW" +static void print_mqtt_data(data_output_t *output, data_t *data, char *format) +{ + data_output_mqtt_t *mqtt = (data_output_mqtt_t *)output; + + char *orig = mqtt->topic + strlen(mqtt->topic); // save current topic + char *end = orig; + + // collect well-known top level keys + data_t *data_type = NULL; + data_t *data_brand = NULL; + data_t *data_model = NULL; + data_t *data_subtype = NULL; + data_t *data_channel = NULL; + data_t *data_id = NULL; + for (data_t *d = data; d; d = d->next) { + if (!strcmp(d->key, "type")) + data_type = d; + else if (!strcmp(d->key, "brand")) + data_brand = d; + else if (!strcmp(d->key, "model")) + data_model = d; + else if (!strcmp(d->key, "subtype")) + data_subtype = d; + else if (!strcmp(d->key, "channel")) + data_channel = d; + else if (!strcmp(d->key, "id")) + data_id = d; + } + + // top-level only + if (!*mqtt->topic) { + // "states" topic + if (!data_model) { + if (mqtt->states) { + size_t message_size = 20000; // state message need a large buffer + char *message = malloc(message_size); + data_print_jsons(data, message, message_size); + mqtt_client_publish(mqtt->mgr, mqtt->states, message); + free(message); + } + return; + } + + // "events" topic + if (mqtt->events) { + char message[1024]; // we expect the biggest strings to be around 500 bytes. + data_print_jsons(data, message, sizeof(message)); + mqtt_client_publish(mqtt->mgr, mqtt->events, message); + } + + // "devices" topic + if (!mqtt->devices) { + return; + } + + strcpy(mqtt->topic, mqtt->devices); + end = mqtt->topic + strlen(mqtt->topic); + } + + // create topic + if (data_type) + end = append_topic(end, data_type); + if (data_brand) + end = append_topic(end, data_brand); + if (data_model) + end = append_topic(end, data_model); + if (data_subtype) + end = append_topic(end, data_subtype); + + if (mqtt->use_channel == USE_CHANNEL_REPLACE_ID) { + if (data_channel) + end = append_topic(end, data_channel); + else if (data_id) + end = append_topic(end, data_id); + } + else if (mqtt->use_channel == USE_CHANNEL_AFTER_ID) { + if (data_id) + end = append_topic(end, data_id); + if (data_channel) + end = append_topic(end, data_channel); + } + else if (mqtt->use_channel == USE_CHANNEL_BEFORE_ID) { + if (data_channel) + end = append_topic(end, data_channel); + if (data_id) + end = append_topic(end, data_id); + } + else /* USE_CHANNEL_NO */ { + if (data_id) + end = append_topic(end, data_id); + } + + while (data) { + if (!strcmp(data->key, "time") + || !strcmp(data->key, "type") + || !strcmp(data->key, "brand") + || !strcmp(data->key, "model") + || !strcmp(data->key, "subtype")) { + // skip, except "id", "channel" + } + else { + // push topic + *end = '/'; + strcpy(end + 1, data->key); + print_value(output, data->type, data->value, data->format); + *end = '\0'; // pop topic + } + data = data->next; + } + *orig = '\0'; // restore topic +} + +static void print_mqtt_string(data_output_t *output, char const *str, char *format) +{ + data_output_mqtt_t *mqtt = (data_output_mqtt_t *)output; + mqtt_client_publish(mqtt->mgr, mqtt->topic, str); +} + +static void print_mqtt_double(data_output_t *output, double data, char *format) +{ + char str[20]; + int ret = snprintf(str, 20, "%f", data); + print_mqtt_string(output, str, format); +} + +static void print_mqtt_int(data_output_t *output, int data, char *format) +{ + char str[20]; + int ret = snprintf(str, 20, "%d", data); + print_mqtt_string(output, str, format); +} + +static void data_output_mqtt_poll(data_output_t *output) +{ + data_output_mqtt_t *mqtt = (data_output_mqtt_t *)output; + + if (!mqtt) + return; + + mqtt_client_poll(mqtt->mgr); +} + +static void data_output_mqtt_free(data_output_t *output) +{ + data_output_mqtt_t *mqtt = (data_output_mqtt_t *)output; + + if (!mqtt) + return; + + free(mqtt->devices); + free(mqtt->events); + free(mqtt->states); + //free(mqtt->homie); + //free(mqtt->hass); + + mqtt_client_free(mqtt->mgr); + free(mqtt); +} + +static char *mqtt_topic_default(char const *topic, char const *base, char const *suffix) +{ + if (topic) + return strdup(topic); + + if (!base) + return strdup(suffix); + + char path[128]; + snprintf(path, sizeof(path), "%s/%s", base, suffix); + return strdup(path); +} + +struct data_output *data_output_mqtt_create(char const *host, char const *port, char *opts, char const *dev_hint) +{ + data_output_mqtt_t *mqtt = calloc(1, sizeof(data_output_mqtt_t)); + if (!mqtt) { + fprintf(stderr, "calloc() failed in %s() %s:%d\n", __func__, __FILE__, __LINE__); + exit(1); + } + + char hostname[64]; + gethostname(hostname, sizeof(hostname) - 1); + hostname[sizeof(hostname) - 1] = '\0'; + // only use hostname, not domain part + char *dot = strchr(hostname, '.'); + if (dot) + *dot = '\0'; + //fprintf(stderr, "Hostname: %s\n", hostname); + + // generate a short deterministic client_id to identify this input device on restart + uint16_t host_crc = crc16((uint8_t *)hostname, strlen(hostname), 0x1021, 0xffff); + uint16_t devq_crc = crc16((uint8_t *)dev_hint, dev_hint ? strlen(dev_hint) : 0, 0x1021, 0xffff); + char client_id[17]; + snprintf(client_id, sizeof(client_id), "rtl_433-%04x%04x", host_crc, devq_crc); + + // default base topic + char base_topic[8 + sizeof(hostname)]; + snprintf(base_topic, sizeof(base_topic), "rtl_433/%s", hostname); + + char *user = NULL; + char *pass = NULL; + int retain = 0; + + // defaults + mqtt->use_channel = USE_CHANNEL_REPLACE_ID; + + // parse auth and format options + char *key, *val; + while (getkwargs(&opts, &key, &val)) { + key = remove_ws(key); + val = trim_ws(val); + if (!key || !*key) + continue; + else if (!strcasecmp(key, "u") || !strcasecmp(key, "user")) + user = val; + else if (!strcasecmp(key, "p") || !strcasecmp(key, "pass")) + pass = val; + else if (!strcasecmp(key, "r") || !strcasecmp(key, "retain")) + retain = atobv(val, 1); + // Simple key-topic mapping + else if (!strcasecmp(key, "d") || !strcasecmp(key, "devices")) + mqtt->devices = mqtt_topic_default(val, base_topic, "devices"); + else if (!strcasecmp(key, "c") || !strcasecmp(key, "usechannel")) { + if (!strcasecmp(val, "afterid")) + mqtt->use_channel = USE_CHANNEL_AFTER_ID; + else if (!strcasecmp(val, "beforeid")) + mqtt->use_channel = USE_CHANNEL_BEFORE_ID; + else if (!strcasecmp(val, "replaceid")) + mqtt->use_channel = USE_CHANNEL_REPLACE_ID; + else + mqtt->use_channel = atobv(val, USE_CHANNEL_REPLACE_ID); + } + // JSON events to single topic + else if (!strcasecmp(key, "e") || !strcasecmp(key, "events")) + mqtt->events = mqtt_topic_default(val, base_topic, "events"); + // JSON states to single topic + else if (!strcasecmp(key, "s") || !strcasecmp(key, "states")) + mqtt->states = mqtt_topic_default(val, base_topic, "states"); + // TODO: Homie Convention https://homieiot.github.io/ + //else if (!strcasecmp(key, "o") || !strcasecmp(key, "homie")) + // mqtt->homie = mqtt_topic_default(val, NULL, "homie"); // base topic + // TODO: Home Assistant MQTT discovery https://www.home-assistant.io/docs/mqtt/discovery/ + //else if (!strcasecmp(key, "a") || !strcasecmp(key, "hass")) + // mqtt->hass = mqtt_topic_default(val, NULL, "homeassistant"); // discovery prefix + else { + printf("Invalid key \"%s\" option.\n", key); + exit(1); + } + } + + // Default is to use all formats + if (!mqtt->devices && !mqtt->events && !mqtt->states) { + mqtt->devices = mqtt_topic_default(NULL, base_topic, "devices"); + mqtt->events = mqtt_topic_default(NULL, base_topic, "events"); + mqtt->states = mqtt_topic_default(NULL, base_topic, "states"); + } + + mqtt->output.print_data = print_mqtt_data; + mqtt->output.print_array = print_mqtt_array; + mqtt->output.print_string = print_mqtt_string; + mqtt->output.print_double = print_mqtt_double; + mqtt->output.print_int = print_mqtt_int; + mqtt->output.output_poll = data_output_mqtt_poll; + mqtt->output.output_free = data_output_mqtt_free; + + mqtt->mgr = mqtt_client_init(host, port, user, pass, client_id, retain); + + return &mqtt->output; +} diff --git a/src/rtl_433.c b/src/rtl_433.c index bf907ba0a..0bd613c76 100644 --- a/src/rtl_433.c +++ b/src/rtl_433.c @@ -38,6 +38,7 @@ #include "pulse_demod.h" #include "decoder.h" #include "data.h" +#include "output_mqtt.h" #include "r_util.h" #include "optparse.h" #include "fileformat.h" @@ -170,7 +171,7 @@ static void usage(int exit_code) " [-w | help] Save data stream to output file (a '-' dumps samples to stdout)\n" " [-W | help] Save data stream to output file, overwrite existing file\n" "\t\t= Data output options =\n" - " [-F kv | json | csv | syslog | null | help] Produce decoded output in given format.\n" + " [-F kv | json | csv | mqtt | syslog | null | help] Produce decoded output in given format.\n" " Append output to file with : (e.g. -F csv:log.csv), defaults to stdout.\n" " Specify host/port for syslog with e.g. -F syslog:127.0.0.1:1514\n" " [-M time | reltime | notime | hires | utc | protocol | level | stats | bits | help] Add various meta data to each output.\n" @@ -238,9 +239,17 @@ static void help_gain(void) static void help_output(void) { fprintf(stderr, - "[-F kv|json|csv|syslog|null] Produce decoded output in given format.\n" + "[-F kv|json|csv|mqtt|syslog|null] Produce decoded output in given format.\n" "\tWithout this option the default is KV output. Use \"-F null\" to remove the default.\n" "\tAppend output to file with : (e.g. -F csv:log.csv), defaults to stdout.\n" + "\tSpecify MQTT server with e.g. -F mqtt://localhost:1883\n" + "\tAdd MQTT options with e.g. -F \"mqtt://host:1883,opt=arg\"\n" + "\tMQTT options are: user=foo, pass=bar, retain[=0|1],\n" + "\t\t usechannel=replaceid|afterid|beforeid|no, [=topic]\n" + "\tSupported MQTT formats: (default is all)\n" + "\t events: posts JSON event data\n" + "\t states: posts JSON state data\n" + "\t devices: posts device and sensor info in nested topics\n" "\tSpecify host/port for syslog with e.g. -F syslog:127.0.0.1:1514\n"); exit(0); } @@ -1215,6 +1224,16 @@ static void add_kv_output(r_cfg_t *cfg, char *param) list_push(&cfg->output_handler, data_output_kv_create(fopen_output(param))); } +static void add_mqtt_output(r_cfg_t *cfg, char *param) +{ + char *host = "localhost"; + char *port = "1883"; + char *opts = hostport_param(param, &host, &port); + fprintf(stderr, "Publishing MQTT UDP datagrams to %s port %s\n", host, port); + + list_push(&cfg->output_handler, data_output_mqtt_create(host, port, opts, cfg->dev_query)); +} + static void add_syslog_output(r_cfg_t *cfg, char *param) { char *host = "localhost"; @@ -1596,6 +1615,9 @@ static void parse_conf_option(r_cfg_t *cfg, int opt, char *arg) else if (strncmp(arg, "kv", 2) == 0) { add_kv_output(cfg, arg_param(arg)); } + else if (strncmp(arg, "mqtt", 4) == 0) { + add_mqtt_output(cfg, arg_param(arg)); + } else if (strncmp(arg, "syslog", 6) == 0) { add_syslog_output(cfg, arg_param(arg)); } diff --git a/vs15/rtl_433.vcxproj b/vs15/rtl_433.vcxproj index ab7eabf2a..ed0a8ab70 100644 --- a/vs15/rtl_433.vcxproj +++ b/vs15/rtl_433.vcxproj @@ -99,6 +99,7 @@ + @@ -125,6 +126,7 @@ + diff --git a/vs15/rtl_433.vcxproj.filters b/vs15/rtl_433.vcxproj.filters index 37b0ed333..ce581a498 100644 --- a/vs15/rtl_433.vcxproj.filters +++ b/vs15/rtl_433.vcxproj.filters @@ -59,6 +59,9 @@ Header Files + + Header Files + Header Files @@ -133,6 +136,9 @@ Source Files + + Source Files + Source Files