From 2edbbc2e20beea3c390ea5d1ca3a097ee0e67e87 Mon Sep 17 00:00:00 2001 From: RubenKelevra Date: Sun, 15 Dec 2024 14:53:41 +0100 Subject: [PATCH] add voltage compensation for dummy load measurements --- utils/measure/measure/powermeter/dummy.py | 6 +- utils/measure/measure/powermeter/errors.py | 4 + utils/measure/measure/powermeter/hass.py | 51 +++- utils/measure/measure/powermeter/kasa.py | 10 +- utils/measure/measure/powermeter/manual.py | 17 +- utils/measure/measure/powermeter/mystrom.py | 11 +- utils/measure/measure/powermeter/ocr.py | 9 +- .../measure/measure/powermeter/powermeter.py | 10 +- utils/measure/measure/powermeter/shelly.py | 22 +- utils/measure/measure/powermeter/tasmota.py | 11 +- utils/measure/measure/powermeter/tuya.py | 11 +- utils/measure/measure/util/measure_util.py | 253 ++++++++++++++++-- 12 files changed, 362 insertions(+), 53 deletions(-) diff --git a/utils/measure/measure/powermeter/dummy.py b/utils/measure/measure/powermeter/dummy.py index 83da7e42a..009c219f4 100644 --- a/utils/measure/measure/powermeter/dummy.py +++ b/utils/measure/measure/powermeter/dummy.py @@ -3,11 +3,13 @@ import time from typing import Any -from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter +from measure.powermeter.powermeter import ExtendedPowerMeasurementResult, PowerMeasurementResult, PowerMeter class DummyPowerMeter(PowerMeter): - def get_power(self) -> PowerMeasurementResult: + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult: + if include_voltage: + return ExtendedPowerMeasurementResult(20.5, 233.0, time.time()) return PowerMeasurementResult(20.5, time.time()) def process_answers(self, answers: dict[str, Any]) -> None: diff --git a/utils/measure/measure/powermeter/errors.py b/utils/measure/measure/powermeter/errors.py index a4410c173..1d5458235 100644 --- a/utils/measure/measure/powermeter/errors.py +++ b/utils/measure/measure/powermeter/errors.py @@ -12,3 +12,7 @@ class ZeroReadingError(PowerMeterError): class ApiConnectionError(PowerMeterError): pass + + +class UnsupportedFeatureError(PowerMeterError): + pass diff --git a/utils/measure/measure/powermeter/hass.py b/utils/measure/measure/powermeter/hass.py index e27360fef..f19d52920 100644 --- a/utils/measure/measure/powermeter/hass.py +++ b/utils/measure/measure/powermeter/hass.py @@ -7,20 +7,22 @@ from homeassistant_api import Client from measure.powermeter.const import QUESTION_POWERMETER_ENTITY_ID -from measure.powermeter.errors import PowerMeterError -from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter +from measure.powermeter.errors import PowerMeterError, UnsupportedFeatureError +from measure.powermeter.powermeter import ExtendedPowerMeasurementResult, PowerMeasurementResult, PowerMeter class HassPowerMeter(PowerMeter): def __init__(self, api_url: str, token: str, call_update_entity: bool) -> None: self._call_update_entity = call_update_entity self._entity_id: str | None = None + self._voltage_entity_id: str | None = None try: self.client = Client(api_url, token, cache_session=False) except Exception as e: raise PowerMeterError(f"Failed to connect to HA API: {e}") from e - def get_power(self) -> PowerMeasurementResult: + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult: + """Get a new power reading from Hass-API. Optionally include voltage.""" if self._call_update_entity: self.client.trigger_service( "homeassistant", @@ -33,7 +35,27 @@ def get_power(self) -> PowerMeasurementResult: if state == "unavailable": raise PowerMeterError(f"Power sensor {self._entity_id} unavailable") last_updated = state.last_updated.timestamp() - return PowerMeasurementResult(float(state.state), last_updated) + power_value = float(state.state) + + if include_voltage: + if not self._voltage_entity_id: + self._voltage_entity_id = self.find_voltage_entity() + if not self._voltage_entity_id: + raise UnsupportedFeatureError("No matching voltage entity found.") + + voltage_state = self.client.get_state(entity_id=self._voltage_entity_id) + if voltage_state == "unavailable": + raise PowerMeterError(f"Voltage sensor {self._voltage_entity_id} unavailable") + + voltage_value = float(voltage_state.state) + return ExtendedPowerMeasurementResult(power_value, voltage_value, last_updated) + + return PowerMeasurementResult(power_value, last_updated) + + def find_voltage_entity(self) -> str | None: + """Try to find a matching voltage entity for the current power entity.""" + matched_sensors = self.match_power_and_voltage_sensors() + return matched_sensors.get(self._entity_id) def get_questions(self) -> list[inquirer.questions.Question]: power_sensor_list = self.get_power_sensors() @@ -52,5 +74,26 @@ def get_power_sensors(self) -> list[str]: power_sensors = [entity.entity_id for entity in sensors if entity.state.attributes.get("unit_of_measurement") == "W"] return sorted(power_sensors) + def get_voltage_sensors(self) -> list[str]: + entities = self.client.get_entities() + sensors = entities["sensor"].entities.values() + voltage_sensors = [entity.entity_id for entity in sensors if entity.state.attributes.get("unit_of_measurement") == "V"] + return sorted(voltage_sensors) + + def match_power_and_voltage_sensors(self) -> dict[str, str]: + power_sensors = self.get_power_sensors() + voltage_sensors = self.get_voltage_sensors() + + # Create mappings based on base names + power_map = {sensor.rsplit("_power", 1)[0]: sensor for sensor in power_sensors} + voltage_map = {sensor.rsplit("_voltage", 1)[0]: sensor for sensor in voltage_sensors} + + matched_sensors = {} + for base_name, power_sensor in power_map.items(): + if base_name in voltage_map: + matched_sensors[power_sensor] = voltage_map[base_name] + + return matched_sensors + def process_answers(self, answers: dict[str, Any]) -> None: self._entity_id = answers[QUESTION_POWERMETER_ENTITY_ID] diff --git a/utils/measure/measure/powermeter/kasa.py b/utils/measure/measure/powermeter/kasa.py index 051dab1ba..3a8d907aa 100644 --- a/utils/measure/measure/powermeter/kasa.py +++ b/utils/measure/measure/powermeter/kasa.py @@ -6,14 +6,20 @@ from kasa import SmartPlug -from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter +from measure.powermeter.errors import UnsupportedFeatureError +from measure.powermeter.powermeter import ExtendedPowerMeasurementResult, PowerMeasurementResult, PowerMeter class KasaPowerMeter(PowerMeter): def __init__(self, device_ip: str) -> None: self._smartplug = SmartPlug(device_ip) - def get_power(self) -> PowerMeasurementResult: + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult: + """Get a new power reading from the Kasa device. Optionally include voltage (FIXME: not yet implemented).""" + if include_voltage: + # FIXME: Not yet implemented + raise UnsupportedFeatureError("Voltage measurement is not yet implemented for Kasa devices.") + loop = asyncio.get_event_loop() power = loop.run_until_complete(self.async_read_power_meter()) return PowerMeasurementResult(power, time.time()) diff --git a/utils/measure/measure/powermeter/manual.py b/utils/measure/measure/powermeter/manual.py index 1045437da..f03422110 100644 --- a/utils/measure/measure/powermeter/manual.py +++ b/utils/measure/measure/powermeter/manual.py @@ -3,12 +3,23 @@ import time from typing import Any -from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter +from measure.powermeter.powermeter import ExtendedPowerMeasurementResult, PowerMeasurementResult, PowerMeter class ManualPowerMeter(PowerMeter): - def get_power(self) -> PowerMeasurementResult: - power = input("Input power measurement:") + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult: + """Manually enter power readings. Optionally enter voltage readings as well.""" + if include_voltage: + power = input("Input power measurement: ") + voltage = input("Input voltage measurement: ") + + return ExtendedPowerMeasurementResult( + power=float(power), + voltage=float(voltage), + updated=time.time(), + ) + + power = input("Input power measurement: ") return PowerMeasurementResult(float(power), time.time()) def process_answers(self, answers: dict[str, Any]) -> None: diff --git a/utils/measure/measure/powermeter/mystrom.py b/utils/measure/measure/powermeter/mystrom.py index 3fa1f4f9b..38f60e0a9 100644 --- a/utils/measure/measure/powermeter/mystrom.py +++ b/utils/measure/measure/powermeter/mystrom.py @@ -5,15 +5,20 @@ import requests -from measure.powermeter.errors import PowerMeterError -from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter +from measure.powermeter.errors import PowerMeterError, UnsupportedFeatureError +from measure.powermeter.powermeter import ExtendedPowerMeasurementResult, PowerMeasurementResult, PowerMeter class MyStromPowerMeter(PowerMeter): def __init__(self, device_ip: str) -> None: self._device_ip = device_ip - def get_power(self) -> PowerMeasurementResult: + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult: + """Get a new power reading from the MyStrom device. Optionally include voltage (FIXME: not yet implemented).""" + if include_voltage: + # FIXME: Not yet implemented + raise UnsupportedFeatureError("Voltage measurement is not yet implemented for MyStrom devices.") + r = requests.get( f"http://{self._device_ip}/report", timeout=10, diff --git a/utils/measure/measure/powermeter/ocr.py b/utils/measure/measure/powermeter/ocr.py index 1dd4c581b..5bc77492f 100644 --- a/utils/measure/measure/powermeter/ocr.py +++ b/utils/measure/measure/powermeter/ocr.py @@ -3,7 +3,8 @@ import os from typing import Any -from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter +from measure.powermeter.errors import UnsupportedFeatureError +from measure.powermeter.powermeter import ExtendedPowerMeasurementResult, PowerMeasurementResult, PowerMeter class OcrPowerMeter(PowerMeter): @@ -16,7 +17,11 @@ def __init__(self) -> None: self.file = open(filepath, "rb") # noqa: SIM115 super().__init__() - def get_power(self) -> PowerMeasurementResult: + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult: + """Get a new power reading via OCR.""" + if include_voltage: + raise UnsupportedFeatureError("Voltage measurement are not supported for OCR mode.") + last_line = self.read_last_line() (timestamp, power) = last_line.strip().split(";") power = float(power) diff --git a/utils/measure/measure/powermeter/powermeter.py b/utils/measure/measure/powermeter/powermeter.py index 01e1c794c..4c9da5c25 100644 --- a/utils/measure/measure/powermeter/powermeter.py +++ b/utils/measure/measure/powermeter/powermeter.py @@ -8,8 +8,8 @@ class PowerMeter(ABC): @abstractmethod - def get_power(self) -> PowerMeasurementResult: - """Get a power measurement from the meter""" + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult: + """Get a power measurement from the meter. Optionally include voltage readings.""" def get_questions(self) -> list[Question]: """Get questions to ask for the chosen powermeter""" @@ -23,3 +23,9 @@ def process_answers(self, answers: dict[str, Any]) -> None: class PowerMeasurementResult(NamedTuple): power: float updated: float + + +class ExtendedPowerMeasurementResult(NamedTuple): + power: float + voltage: float + updated: float diff --git a/utils/measure/measure/powermeter/shelly.py b/utils/measure/measure/powermeter/shelly.py index aeff0b05c..6b1f0d30a 100644 --- a/utils/measure/measure/powermeter/shelly.py +++ b/utils/measure/measure/powermeter/shelly.py @@ -7,8 +7,8 @@ import requests -from measure.powermeter.errors import ApiConnectionError -from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter +from measure.powermeter.errors import ApiConnectionError, UnsupportedFeatureError +from measure.powermeter.powermeter import ExtendedPowerMeasurementResult, PowerMeasurementResult, PowerMeter _LOGGER = logging.getLogger("measure") @@ -76,6 +76,13 @@ def _check_endpoint_availability(self, endpoint: str) -> bool: return True + def parse_json_with_voltage(self, json: dict) -> ExtendedPowerMeasurementResult: + return ExtendedPowerMeasurementResult( + power=float(json["apower"]), + voltage=float(json["voltage"]), + updated=time.time(), + ) + class ShellyPowerMeter(PowerMeter): def __init__(self, shelly_ip: str, timeout: int = 5) -> None: @@ -88,8 +95,8 @@ def __init__(self, shelly_ip: str, timeout: int = 5) -> None: self.api = ShellyApiGen2Plus(self.ip_address, self.timeout) self.api.check_gen2_plus_endpoints() - def get_power(self) -> PowerMeasurementResult: - """Get a new power reading from the Shelly device""" + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult: + """Get a new power reading from the Shelly device. Optionally include voltage.""" try: r = requests.get( f"http://{self.ip_address}{self.api.endpoint}", @@ -100,6 +107,13 @@ def get_power(self) -> PowerMeasurementResult: raise ApiConnectionError("Could not connect to Shelly Plug") from e json = r.json() + + if include_voltage: + if isinstance(self.api, ShellyApiGen1): + raise UnsupportedFeatureError("Voltage measurement is not supported on Shelly Gen1 devices") + if isinstance(self.api, ShellyApiGen2Plus): + return self.api.parse_json_with_voltage(json) + return self.api.parse_json(json) def _detect_api_version(self) -> int: diff --git a/utils/measure/measure/powermeter/tasmota.py b/utils/measure/measure/powermeter/tasmota.py index ab37a2899..023e8e087 100644 --- a/utils/measure/measure/powermeter/tasmota.py +++ b/utils/measure/measure/powermeter/tasmota.py @@ -5,15 +5,20 @@ import requests -from measure.powermeter.errors import PowerMeterError -from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter +from measure.powermeter.errors import PowerMeterError, UnsupportedFeatureError +from measure.powermeter.powermeter import ExtendedPowerMeasurementResult, PowerMeasurementResult, PowerMeter class TasmotaPowerMeter(PowerMeter): def __init__(self, device_ip: str) -> None: self._device_ip = device_ip - def get_power(self) -> PowerMeasurementResult: + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult: + """Get a new power reading from the Tasmota device. Optionally include voltage (FIXME: not yet implemented).""" + if include_voltage: + # FIXME: Not yet implemented + raise UnsupportedFeatureError("Voltage measurement is not yet implemented for Tasmota devices.") + r = requests.get( f"http://{self._device_ip}/cm?cmnd=STATUS+8", timeout=10, diff --git a/utils/measure/measure/powermeter/tuya.py b/utils/measure/measure/powermeter/tuya.py index a98c2e286..79cd8d4e3 100644 --- a/utils/measure/measure/powermeter/tuya.py +++ b/utils/measure/measure/powermeter/tuya.py @@ -5,8 +5,8 @@ import tuyapower -from measure.powermeter.errors import PowerMeterError -from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter +from measure.powermeter.errors import PowerMeterError, UnsupportedFeatureError +from measure.powermeter.powermeter import ExtendedPowerMeasurementResult, PowerMeasurementResult, PowerMeter STATUS_OK = "OK" @@ -24,7 +24,12 @@ def __init__( self._device_key = device_key self._device_version = device_version - def get_power(self) -> PowerMeasurementResult: + def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult: + """Get a new power reading from the Tuya device. Optionally include voltage (FIXME: not yet implemented).""" + if include_voltage: + # FIXME: Not yet implemented + raise UnsupportedFeatureError("Voltage measurement is not yet implemented for Tuya devices.") + (_, w, _, _, err) = tuyapower.deviceInfo( self._device_id, self._device_ip, diff --git a/utils/measure/measure/util/measure_util.py b/utils/measure/measure/util/measure_util.py index 27b7e1377..711280c02 100644 --- a/utils/measure/measure/util/measure_util.py +++ b/utils/measure/measure/util/measure_util.py @@ -1,13 +1,19 @@ +from __future__ import annotations + import logging import os import time from datetime import datetime as dt +from statistics import mean + +import numpy as np from measure.config import MeasureConfig from measure.const import PROJECT_DIR from measure.powermeter.errors import ( OutdatedMeasurementError, PowerMeterError, + UnsupportedFeatureError, ZeroReadingError, ) from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter @@ -21,21 +27,115 @@ def __init__(self, power_meter: PowerMeter, config: MeasureConfig) -> None: self.dummy_load_value: float | None = None self.config = config - def take_average_measurement(self, duration: int) -> float: - """Measure average power consumption for a given time period in seconds""" - _LOGGER.info("Measuring average power for %d seconds", duration) + def take_average_measurement(self, duration: int, measure_resistance: bool = False) -> float: + """ + Measure the average power consumption or resistance for a given time period in seconds. + + This function calculates the average power or resistance by taking multiple readings over + the specified duration. If `measure_resistance` is True, the function computes resistance + using the formula R = V^2 / P, where V is the voltage and P is the power. If a dummy load + resistive value is set, the power consumption of the dummy load calculated for each + measurement, depending on the current voltage and is subtracted from the power + measurements. + + Args: + duration (int): The time duration (in seconds) over which to take measurements. + measure_resistance (bool): Whether to measure resistance instead of power. Defaults to False. + + Returns: + float: The average resistance (in Ω) or power (in W) over the measurement duration. + + Raises: + UnsupportedFeatureError: If `measure_resistance` is True but the power meter does not + support voltage measurements. + + Error handling: + - If voltage measurements are not supported, the program will terminate with an appropriate + error message, if measuring power consumption and a dummy load resistive value is set. + - For resistance measurements, voltage values below 1 are treated as invalid, and the program + exits to avoid incorrect calculations. + - Ignores single measurements of <= 0 W. + """ + _LOGGER.info("Measuring average %s over %s seconds", "resistance" if measure_resistance else "power", duration) start_time = time.time() readings: list[float] = [] + + first_measurement = True + while (time.time() - start_time) < duration: + if first_measurement: + first_measurement = False + else: + # sleep time exceeds duration + if not ((time.time() - start_time + self.config.sleep_time) < duration): + break + time.sleep(self.config.sleep_time) + + if measure_resistance: + result = self.power_meter.get_power(include_voltage=True) + power, voltage = result.power, result.voltage + + if voltage < 1: + _LOGGER.error("Error during measurement: Voltage measurement returned zero. Aborting measurement.") + exit(1) + + if round(power, 2) == 0: + _LOGGER.warning("Invalid measurement: power: %.2f W, voltage: %.2f", power, voltage) + continue + + resistance = round((voltage**2) / power, 4) + readings.append(resistance) + _LOGGER.info( + "Measured resistance: %.2f Ω; measured power: %.2f W, voltage: %.2f", + resistance, + power, + voltage, + ) + continue + + if self.dummy_load_value: # measurement with dummy load + try: + result = self.power_meter.get_power(include_voltage=True) + except UnsupportedFeatureError as e: + _LOGGER.error("Error during measurement: %s", e) + print("The selected power meter does not support voltage measurements, required to measure with dummy loads.") + exit(1) + + power, voltage = result.power, result.voltage + + if voltage < 1: + _LOGGER.error("Error during measurement: Voltage measurement returned zero. Aborting measurement.") + exit(1) + + dummy_power = (voltage**2) / self.dummy_load_value + power -= dummy_power + + if round(power, 2) <= 0: + _LOGGER.warning( + "Invalid measurement after subtracting dummy load consumption. Calculated consumption: %.2f W; ignoring", + power, + ) + continue + + readings.append(power) + _LOGGER.info("Measured power: %.2f W", power) + continue + + # measurement without dummy load power = self.power_meter.get_power().power - _LOGGER.info("Measured power: %.2f", power) + if round(power, 2) == 0: + _LOGGER.warning("Invalid measurement. Consumption: %.2f W; ignoring", power) + + continue readings.append(power) - time.sleep(self.config.sleep_time) - average = round(sum(readings) / len(readings), 2) - if self.dummy_load_value: - average -= self.dummy_load_value + _LOGGER.info("Measured power: %.2f W", power) - _LOGGER.info("Average power: %s", average) + if not readings: + _LOGGER.error("No valid readings were recorded.") + exit(1) + + average = round(mean(readings), 2) + _LOGGER.info("Average of %d measurements: %.2f %s", len(readings), average, "Ω" if measure_resistance else "W") return average def take_measurement( @@ -50,8 +150,13 @@ def take_measurement( _LOGGER.debug("Taking sample %d", i) error = None measurement: PowerMeasurementResult | None = None + try: - measurement = self.power_meter.get_power() + if self.dummy_load_value: + measurement = self.power_meter.get_power(include_voltage=True) + else: + measurement = self.power_meter.get_power() + updated_at = dt.fromtimestamp(measurement.updated).strftime( "%d-%m-%Y, %H:%M:%S", ) @@ -61,16 +166,29 @@ def take_measurement( if measurement: # Check if measurement is not outdated - if measurement.updated < start_timestamp: + if start_timestamp and measurement.updated < start_timestamp: error = OutdatedMeasurementError( "Power measurement is outdated. Aborting after %d successive retries", self.config.max_retries, ) + power = measurement.power + # Check if we not have a 0 measurement - if measurement.power == 0: + if round(power, 2) <= 0: error = ZeroReadingError("0 watt was read from the power meter") + if self.dummy_load_value: + voltage = measurement.voltage + if voltage < 1: + error = ZeroReadingError("0 Volt was read from the power meter") + else: + dummy_power = (voltage**2) / self.dummy_load_value + power -= dummy_power + + if round(power, 2) <= 0: + error = ZeroReadingError("0 watt was read from the power meter, after substracting the dummy load") + if error: # Prevent endless recursion. Throw error when max retries is reached if retry_count == self.config.max_retries: @@ -79,44 +197,129 @@ def take_measurement( time.sleep(self.config.sleep_time) return self.take_measurement(start_timestamp, retry_count) - measurements.append(measurement.power) + measurements.append(power) if self.config.sample_count > 1: time.sleep(self.config.sleep_time_sample) # Determine Average PM reading - average = sum(measurements) / len(measurements) - if self.dummy_load_value: - average -= self.dummy_load_value + if not measurements: + _LOGGER.error("No valid readings were recorded.") + exit(1) + + average = mean(measurements) + _LOGGER.info("Average measurement: %.3f W", average) return average def initialize_dummy_load(self) -> float: - """Get the previously measured dummy load value, or take a new measurement if it doesn't exist""" + """Get the previously measured dummy load resistance, or take a new measurement if it doesn't exist""" dummy_load_file = os.path.join( PROJECT_DIR, - ".persistent/dummy_load", + ".persistent/dummy_load_resistance", ) if os.path.exists(dummy_load_file): with open(dummy_load_file) as f: value = float(f.read()) - _LOGGER.info("Dummy load was already measured before, value: %sW", value) + _LOGGER.info("Dummy load was already measured before, value: %s Ω", value) + print("You need to preheat the dummy load, so it's consumption can stablize.") + print("If you're unsure the dummy load is sufficently preheated or you're using a different one, remeasure.") + print() inquirer = input("Do you want to measure the dummy load again? (y/n): ") if inquirer.lower() == "n": self.dummy_load_value = value return self.dummy_load_value + print() + print("Tip: Use a dummy load with constant power consumption. Stick to resistive loads for the best results!") + print("Important: Connect only the dummy load to your smart plug—not the device you're measuring.") + print() + print("The script will now measure the dummy load and continue once it's consumption has stablized.") + print("Depending on the dummy load this may take 2 hours.") + print() + input("Ready to begin measuring the dummy load? Hit Enter ") + self.dummy_load_value = self._measure_dummy_load(dummy_load_file) return self.dummy_load_value def _measure_dummy_load(self, file_path: str) -> float: """Measure the dummy load and persist the value for future measurement session""" - print() - print("Tip: Use a dummy load with constant power consumption. Stick to resistive loads for the best results!") - print("Important: Connect only the dummy load to your smart plug—not the device you're measuring.") - print("Preheat your dummy load until its temperature stabilizes. This usually takes about 2 hours.") - input("Ready to start? Press Enter to begin measuring the dummy load!") - average = self.take_average_measurement(1) + + measurements = 20 + duration = 30 + + print("Measuring and checking dummy load... this will take at least %.0f minutes." % (measurements / 60 * duration)) + + # Validate power meter is capable of measuring voltage + self._validate_voltage_support() + + while True: + averages = [self.take_average_measurement(duration, measure_resistance=True) for _ in range(measurements)] + + trend = self._check_trend(averages) + + if not trend: + _LOGGER.error("Error during measurement: No trend could be calculated") + exit(1) + + if trend == "steady": + break + + print(f"Dummy load resistance has not yet stablized and is {trend}, repeating.") + + average = round(mean(averages), 2) + + _LOGGER.info("Dummy load measurement completed. Resistance: %s Ω", average) + with open(file_path, "w") as f: f.write(str(average)) return average + + def _check_trend(self, averages: list[float]) -> str | None: + """ + Checks if the resistance readings of a dummy load are increasing, decreasing, or steady (fluctuating). + + Returns: + str: "increasing", "decreasing", or "steady" based on the trends. + None: if not enough samples were supplied + """ + if len(averages) < 20: + return None + + mid = len(averages) // 2 # Calculate the midpoint + + first_half = averages[:mid] + second_half = averages[mid:] + + # Helper function to calculate trend + def calc_trend(values: list[float]) -> float: + # Perform a linear regression to estimate the trend + x = np.arange(len(values)) + coeffs = np.polyfit(x, values, 1) # Linear fit: y = mx + c + return coeffs[0] # Extract the slope (m) + + first_trend = calc_trend(first_half) + second_trend = calc_trend(second_half) + + def trend_direction(slope: float, threshold: float = 0.01) -> str: + if slope > threshold: + return "increasing" + if slope < -threshold: + return "decreasing" + return "steady" + + first_trend = trend_direction(first_trend) + second_trend = trend_direction(second_trend) + + if first_trend == second_trend and first_trend != "steady": + return first_trend + return "steady" + + def _validate_voltage_support(self) -> None: + """Check if the power meter supports voltage readings.""" + try: + self.take_average_measurement(1, measure_resistance=True) + except UnsupportedFeatureError as e: + _LOGGER.error("Error during measurement: %s", e) + print("The selected power meter does not support voltage measurements, required to measure dummy loads.") + exit(1)