From c58ca462199a9f260ff652759c8eea4ab5eb7309 Mon Sep 17 00:00:00 2001 From: avollkopf <43980694+avollkopf@users.noreply.github.com> Date: Sat, 6 Jul 2024 09:06:48 +0200 Subject: [PATCH] test on optional offset for mqtt sensor (#139) --- cbpi/__init__.py | 2 +- cbpi/cli.py | 7 +- cbpi/config/config.yaml | 1 + cbpi/extension/ConfigUpdate/__init__.py | 20 +++- cbpi/extension/mqtt_sensor/__init__.py | 153 ++++++++++++++++++++++-- 5 files changed, 169 insertions(+), 14 deletions(-) diff --git a/cbpi/__init__.py b/cbpi/__init__.py index 33fa074c..46e01d88 100644 --- a/cbpi/__init__.py +++ b/cbpi/__init__.py @@ -1,3 +1,3 @@ -__version__ = "4.4.3.a2" +__version__ = "4.4.3.a3" __codename__ = "Yeast Starter" diff --git a/cbpi/cli.py b/cbpi/cli.py index 86e9e89e..8c991442 100644 --- a/cbpi/cli.py +++ b/cbpi/cli.py @@ -7,7 +7,12 @@ from cbpi.utils.utils import load_config from zipfile import ZipFile from cbpi.craftbeerpi import CraftBeerPi -import os, pwd +import os +try: + import pwd + module_pwd=True +except: + module_pwd=False import pkgutil import shutil import click diff --git a/cbpi/config/config.yaml b/cbpi/config/config.yaml index b15c7779..eddc66de 100644 --- a/cbpi/config/config.yaml +++ b/cbpi/config/config.yaml @@ -11,6 +11,7 @@ mqtt_host: localhost mqtt_port: 1883 mqtt_username: "" mqtt_password: "" +mqtt_offset: false username: cbpi password: 123 diff --git a/cbpi/extension/ConfigUpdate/__init__.py b/cbpi/extension/ConfigUpdate/__init__.py index 95f2a6cd..ab532c24 100644 --- a/cbpi/extension/ConfigUpdate/__init__.py +++ b/cbpi/extension/ConfigUpdate/__init__.py @@ -8,7 +8,7 @@ from cbpi.api import * from cbpi.api.config import ConfigType from cbpi.api.base import CBPiBase -import glob +import glob, yaml from cbpi import __version__ logger = logging.getLogger(__name__) @@ -19,6 +19,12 @@ def __init__(self,cbpi): self.cbpi = cbpi self._task = asyncio.create_task(self.run()) + def append_to_yaml(self, file_path, data_to_append): + + with open(file_path[0], 'a+') as file: + file.seek(0) + yaml.dump(data_to_append, file, default_flow_style=False) + async def run(self): logging.info("Check Config for required changes") @@ -67,7 +73,7 @@ async def run(self): CONFIG_STATUS = self.cbpi.config.get("CONFIG_STATUS", None) self.version=__version__ current_grid = self.cbpi.config.get("current_grid", None) - + mqtt_offset=self.cbpi.static_config.get("mqtt_offset", None) if boil_temp is None: logger.info("INIT Boil Temp Setting") @@ -558,6 +564,16 @@ async def run(self): except Exception as e: logging.error(e) + if mqtt_offset is None: + logging.info("INIT MQTT Offset in static config") + try: + static_config_file=glob.glob(self.cbpi.config_folder.get_file_path('config.yaml')) + data_to_append = {'mqtt_offset': False} + self.append_to_yaml(static_config_file, data_to_append) + pass + except Exception as e: + logging.error(e) + logging.warning('Unable to update database') ## Check if influxdbname is in config if CONFIG_STATUS is None or CONFIG_STATUS != self.version: diff --git a/cbpi/extension/mqtt_sensor/__init__.py b/cbpi/extension/mqtt_sensor/__init__.py index 68db8197..9dc6d250 100644 --- a/cbpi/extension/mqtt_sensor/__init__.py +++ b/cbpi/extension/mqtt_sensor/__init__.py @@ -8,16 +8,17 @@ import time from datetime import datetime + @parameters([Property.Text(label="Topic", configurable=True, description="MQTT Topic"), - Property.Text(label="PayloadDictionary", configurable=True, default_value="", - description="Where to find msg in payload, leave blank for raw payload"), - Property.Kettle(label="Kettle", description="Reduced logging if Kettle is inactive / range warning in dashboard (only Kettle or Fermenter to be selected)"), - Property.Fermenter(label="Fermenter", description="Reduced logging if Fermenter is inactive / range warning in dashboard (only Kettle or Fermenter to be selected)"), - Property.Number(label="ReducedLogging", configurable=True, description="Reduced logging frequency in seconds if selected Kettle or Fermenter is inactive (default:60 sec | 0 disabled)"), - Property.Number(label="Timeout", configurable=True, unit="sec", - description="Timeout in seconds to send notification (default:60 | deactivated: 0)"), - Property.Number(label="TempRange", configurable=True, unit="degree", - description="Temp range in degree between reading and target temp of fermenter/kettle. Larger difference shows different color in dashboard (default:0 | deactivated: 0)")]) + Property.Text(label="PayloadDictionary", configurable=True, default_value="", + description="Where to find msg in payload, leave blank for raw payload"), + Property.Kettle(label="Kettle", description="Reduced logging if Kettle is inactive / range warning in dashboard (only Kettle or Fermenter to be selected)"), + Property.Fermenter(label="Fermenter", description="Reduced logging if Fermenter is inactive / range warning in dashboard (only Kettle or Fermenter to be selected)"), + Property.Number(label="ReducedLogging", configurable=True, description="Reduced logging frequency in seconds if selected Kettle or Fermenter is inactive (default:60 sec | 0 disabled)"), + Property.Number(label="Timeout", configurable=True, unit="sec", + description="Timeout in seconds to send notification (default:60 | deactivated: 0)"), + Property.Number(label="TempRange", configurable=True, unit="degree", + description="Temp range in degree between reading and target temp of fermenter/kettle. Larger difference shows different color in dashboard (default:0 | deactivated: 0)")]) class MQTTSensor(CBPiSensor): def __init__(self, cbpi, id, props): @@ -135,6 +136,135 @@ def get_state(self): async def on_stop(self): self.subscribed = self.cbpi.satellite.unsubscribe(self.Topic, self.on_message) +@parameters([Property.Text(label="Topic", configurable=True, description="MQTT Topic"), + Property.Text(label="PayloadDictionary", configurable=True, default_value="", + description="Where to find msg in payload, leave blank for raw payload"), + Property.Kettle(label="Kettle", description="Reduced logging if Kettle is inactive / range warning in dashboard (only Kettle or Fermenter to be selected)"), + Property.Fermenter(label="Fermenter", description="Reduced logging if Fermenter is inactive / range warning in dashboard (only Kettle or Fermenter to be selected)"), + Property.Number(label="Offset", configurable=True, description="Offset for MQTT Sensor (default is 0). !!! Use this only with caution as offset for MQTT sensor should be defined on Sensor side !!!"), + Property.Number(label="ReducedLogging", configurable=True, description="Reduced logging frequency in seconds if selected Kettle or Fermenter is inactive (default:60 sec | 0 disabled)"), + Property.Number(label="Timeout", configurable=True, unit="sec", + description="Timeout in seconds to send notification (default:60 | deactivated: 0)"), + Property.Number(label="TempRange", configurable=True, unit="degree", + description="Temp range in degree between reading and target temp of fermenter/kettle. Larger difference shows different color in dashboard (default:0 | deactivated: 0)")]) +class MQTTSensorOffset(CBPiSensor): + + def __init__(self, cbpi, id, props): + super(MQTTSensorOffset, self).__init__(cbpi, id, props) + self.Topic = self.props.get("Topic", None) + self.offset = float(self.props.get("Offset", 0)) + self.payload_text = self.props.get("PayloadDictionary", None) + if self.payload_text != None: + self.payload_text = self.payload_text.split('.') + self.subscribed = self.cbpi.satellite.subscribe(self.Topic, self.on_message) + self.value: float = 999 + self.timeout=int(self.props.get("Timeout", 60)) + self.temprange=float(self.props.get("TempRange", 0)) + self.starttime = time.time() + self.notificationsend = False + self.nextchecktime=self.starttime+self.timeout + self.lastdata=time.time() + self.lastlog=0 + self.sensor=self.get_sensor(self.id) + self.reducedfrequency=int(self.props.get("ReducedLogging", 60)) + if self.reducedfrequency < 0: + self.reducedfrequency = 0 + self.kettleid=self.props.get("Kettle", None) + self.fermenterid=self.props.get("Fermenter", None) + self.reducedlogging = True if self.kettleid or self.fermenterid else False + + if self.kettleid is not None and self.fermenterid is not None: + self.reducedlogging=False + self.cbpi.notify("MQTTSensor", "Sensor '" + str(self.sensor.name) + "' cant't have Fermenter and Kettle defined for reduced logging / range warning.", NotificationType.WARNING, action=[NotificationAction("OK", self.Confirm)]) + + async def Confirm(self, **kwargs): + self.nextchecktime = time.time() + self.timeout + self.notificationsend = False + pass + + async def message(self): + target_timestring= datetime.fromtimestamp(self.lastdata) + self.cbpi.notify("MQTTSensor Timeout", "Sensor '" + str(self.sensor.name) + "' did not respond. Last data received: "+target_timestring.strftime("%D %H:%M"), NotificationType.WARNING, action=[NotificationAction("OK", self.Confirm)]) + pass + + async def on_message(self, message): + val = json.loads(message.payload.decode()) + try: + if self.payload_text is not None: + for key in self.payload_text: + val = val.get(key, None) + + if isinstance(val, (int, float, str)): + self.value = float(val)+self.offset + self.push_update(self.value) + if self.reducedlogging == True: + await self.logvalue() + else: + logging.info("MQTTSensor {} regular logging".format(self.sensor.name)) + self.log_data(self.value) + self.lastlog = time.time() + + if self.timeout !=0: + self.nextchecktime = time.time() + self.timeout + self.notificationsend = False + self.lastdata=time.time() + except Exception as e: + logging.error("MQTT Sensor Error {}".format(e)) + + async def logvalue(self): + self.kettle = self.get_kettle(self.kettleid) if self.kettleid is not None else None + self.fermenter = self.get_fermenter(self.fermenterid) if self.fermenterid is not None else None + now=time.time() + if self.kettle is not None: + try: + kettlestatus=self.kettle.instance.state + except: + kettlestatus=False + if kettlestatus: + self.log_data(self.value) + logging.info("MQTTSensor {} Kettle Active".format(self.sensor.name)) + self.lastlog = time.time() + else: + logging.info("MQTTSensor {} Kettle Inactive".format(self.sensor.name)) + if self.reducedfrequency != 0: + if now >= self.lastlog + self.reducedfrequency: + self.log_data(self.value) + self.lastlog = time.time() + logging.info("Logged with reduced freqency") + pass + + if self.fermenter is not None: + try: + fermenterstatus=self.fermenter.instance.state + except: + fermenterstatus=False + if fermenterstatus: + self.log_data(self.value) + logging.info("MQTTSensor {} Fermenter Active".format(self.sensor.name)) + self.lastlog = time.time() + else: + logging.info("MQTTSensor {} Fermenter Inactive".format(self.sensor.name)) + if self.reducedfrequency != 0: + if now >= self.lastlog + self.reducedfrequency: + self.log_data(self.value) + self.lastlog = time.time() + logging.info("Logged with reduced freqency") + pass + + async def run(self): + while self.running: + if self.timeout !=0: + if time.time() > self.nextchecktime and self.notificationsend == False: + await self.message() + self.notificationsend=True + await asyncio.sleep(1) + + def get_state(self): + return dict(value=self.value) + + async def on_stop(self): + self.subscribed = self.cbpi.satellite.unsubscribe(self.Topic, self.on_message) + def setup(cbpi): ''' @@ -145,4 +275,7 @@ def setup(cbpi): :return: ''' if str(cbpi.static_config.get("mqtt", False)).lower() == "true": - cbpi.plugin.register("MQTTSensor", MQTTSensor) + if str(cbpi.static_config.get("mqtt_offset", False)).lower() == "false": + cbpi.plugin.register("MQTTSensor", MQTTSensor) + else: + cbpi.plugin.register("MQTTSensor", MQTTSensorOffset)