Skip to content

Commit

Permalink
examples: Add dup filtering to mqtt_relay
Browse files Browse the repository at this point in the history
Keep information about the previous value sent.  If it's been 5
seconds, or new value is different (ignoring keys like snr and
frequency), then send it.  Otherwise, just don't.  This causes bursts
of e.g. 4 transmissions to result in one MQTT message, on the theory
that the 4 transmissions are not actually 4 messags, but a strategy to
transmit one message more reliably.

Define a new configuration option to enable duplicate filtering, and
default it to True.

Steal logging config from mqtt_filter.py, and add a configuration
option DEBUG that if True results in debug logging instead of info.
  • Loading branch information
gdt committed Aug 4, 2024
1 parent 3821498 commit 0aa31f5
Showing 1 changed file with 100 additions and 7 deletions.
107 changes: 100 additions & 7 deletions examples/rtl_433_mqtt_relay.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,37 @@
from __future__ import print_function
from __future__ import with_statement

import socket
import json
import paho.mqtt.client as mqtt
import logging
import socket
import time

import paho.mqtt.client as mqtt

# The config class represents a config object. The constructor takes
# an optional pathname, and will switch on the suffix (.yaml for now)
# and read a dictionary.
class rtlconfig(object):

# Initialize with default values.
c = {
# Syslog socket configuration
# Set log level to info (False) or debug (True).
'DEBUG': False,

# Address to listen on for syslog/json messages from rtl_433
'UDP_IP': "127.0.0.1",
'UDP_PORT': 1433,

# MQTT broker configuration
# MQTT broker credentials
'MQTT_HOST': "127.0.0.1",
'MQTT_PORT': 1883,
'MQTT_USERNAME': None,
'MQTT_PASSWORD': None,
'MQTT_TLS': False,

# MQTT content
'MQTT_PREFIX': "sensor/rtl_433",
'MQTT_DEDUP': True,
'MQTT_INDIVIDUAL_TOPICS': True,
'MQTT_JSON_TOPIC': True,
}
Expand All @@ -68,17 +77,86 @@ def __init__(self, f=None):
def __getitem__(self, k):
return self.c[k]

# A dedup class object supports deduping a stream of reports by
# answering if a report is interesting relative to the history.
# While more complicated deduping is allowed by the interface, for now
# it is very simple, keeping track of only the previous interesting object.
# For now, we more or less require that all reports have the same keys.
# \todo Consider a cache with several entries.
class dedup(object):

def __init__(self):
# Make this long enough to skip repeats, but allow messages
# every 10s to come through.
self.duration = 5
# Exclude reception metadata (time and RF).
self.boring_keys = ('time', 'freq', 'freq1', 'freq2', 'rssi', 'snr', 'noise', 'raw_msg')
# Initialize storage for what was last sent.
(self.last_report, self.last_now) = (None, None)

def send_store(self, report, n):
(self.last_report, self.last_now) = (report, n)
return True

# Return True if j1 and j2 are the same, except for boring_keys.
def equiv(self, j1, j2):
for (k, v) in j1.items():
# If in boring, we don't care.
if k not in self.boring_keys:
# If in j1 and not j2, they are different.
if k not in j2:
logging.debug("equiv: %s in j1 and not j2" % (k))
return False
if j1[k] != j2[k]:
logging.debug("equiv: %s differs j1=%s and j2=%s" % (k, j1[k], j2[k]))
return False
# If the lengths are different, they must be different.
if len(j1) != len(j2):
logging.debug("equiv: len(j1) %d != len(j2) %d" % (len(j1), len(j2)))
return False

# If we get here, then the lengths are the same, and all
# non-boring keys in j1 exist in j2, and have the same value.
# It could be that j2 is missing a boring key and also has a
# new non-boring key, but boring keys in particular should not
# be variable.
return True

# report is a python dictionary
def is_interesting(self, report):
n = time.time()

# If previous interesting is empty (or troubled), accept this
# one.
if self.last_report is None or self.last_now is None:
logging.debug("interesting: no previous")
return self.send_store(report, n)

# If previous one was too long ago, accept this one.
if n - self.last_now > self.duration:
logging.debug("interesting: time")
return self.send_store(report, n)

if not self.equiv(self.last_report, report):
logging.debug("interesting: different")
return self.send_store(report, n)

return False

# Create a config object, defaults modified by the config file if present.
c = rtlconfig("rtl_433_mqtt_relay.yaml")

# Create a dedup object for later use, even if it's configure off.
d = dedup()

def mqtt_connect(client, userdata, flags, rc):
"""Handle MQTT connection callback."""
print("MQTT connected: " + mqtt.connack_string(rc))
logging.info("MQTT connected: " + mqtt.connack_string(rc))


def mqtt_disconnect(client, userdata, rc):
"""Handle MQTT disconnection callback."""
print("MQTT disconnected: " + mqtt.connack_string(rc))
logging.info("MQTT disconnected: " + mqtt.connack_string(rc))


# Create listener for incoming json string packets.
Expand All @@ -100,6 +178,14 @@ def sanitize(text):
def publish_sensor_to_mqtt(mqttc, data, line):
"""Publish rtl_433 sensor data to MQTT."""

if c['MQTT_DEDUP']:
# If this data is not novel relative to recent data, just skip it.
# Otherwise, send it via MQTT.
if not d.is_interesting(data):
logging.debug(" not interesting")
return
logging.debug( "INTERESTING")

# Construct a topic from the information that identifies which
# device this frame is from.
# NB: id is only used if channel is not present.
Expand Down Expand Up @@ -166,6 +252,7 @@ def rtl_433_probe():
try:
line = parse_syslog(line)
data = json.loads(line)
logging.debug("received %s" % line)
publish_sensor_to_mqtt(mqttc, data, line)

except ValueError:
Expand All @@ -179,8 +266,14 @@ def run():
# uid
# gid
# working_directory
rtl_433_probe()

logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s',datefmt='%Y-%m-%dT%H:%M:%S%z')
logging.getLogger().setLevel(logging.INFO)
if c['DEBUG']:
logging.getLogger().setLevel(logging.DEBUG)
logging.debug("DEBUG LOGGING ENABLED")

rtl_433_probe()

if __name__ == "__main__":
run()

0 comments on commit 0aa31f5

Please sign in to comment.