Skip to content

Commit

Permalink
add voltage compensation for dummy load measurements
Browse files Browse the repository at this point in the history
  • Loading branch information
RubenKelevra committed Dec 16, 2024
1 parent e7b1950 commit 2edbbc2
Show file tree
Hide file tree
Showing 12 changed files with 362 additions and 53 deletions.
6 changes: 4 additions & 2 deletions utils/measure/measure/powermeter/dummy.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
import time
from typing import Any

from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter
from measure.powermeter.powermeter import ExtendedPowerMeasurementResult, PowerMeasurementResult, PowerMeter


class DummyPowerMeter(PowerMeter):
def get_power(self) -> PowerMeasurementResult:
def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult:
if include_voltage:
return ExtendedPowerMeasurementResult(20.5, 233.0, time.time())
return PowerMeasurementResult(20.5, time.time())

def process_answers(self, answers: dict[str, Any]) -> None:
Expand Down
4 changes: 4 additions & 0 deletions utils/measure/measure/powermeter/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ class ZeroReadingError(PowerMeterError):

class ApiConnectionError(PowerMeterError):
pass


class UnsupportedFeatureError(PowerMeterError):
pass
51 changes: 47 additions & 4 deletions utils/measure/measure/powermeter/hass.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,22 @@
from homeassistant_api import Client

from measure.powermeter.const import QUESTION_POWERMETER_ENTITY_ID
from measure.powermeter.errors import PowerMeterError
from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter
from measure.powermeter.errors import PowerMeterError, UnsupportedFeatureError
from measure.powermeter.powermeter import ExtendedPowerMeasurementResult, PowerMeasurementResult, PowerMeter


class HassPowerMeter(PowerMeter):
def __init__(self, api_url: str, token: str, call_update_entity: bool) -> None:
self._call_update_entity = call_update_entity
self._entity_id: str | None = None
self._voltage_entity_id: str | None = None
try:
self.client = Client(api_url, token, cache_session=False)
except Exception as e:
raise PowerMeterError(f"Failed to connect to HA API: {e}") from e

def get_power(self) -> PowerMeasurementResult:
def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult:
"""Get a new power reading from Hass-API. Optionally include voltage."""
if self._call_update_entity:
self.client.trigger_service(
"homeassistant",
Expand All @@ -33,7 +35,27 @@ def get_power(self) -> PowerMeasurementResult:
if state == "unavailable":
raise PowerMeterError(f"Power sensor {self._entity_id} unavailable")
last_updated = state.last_updated.timestamp()
return PowerMeasurementResult(float(state.state), last_updated)
power_value = float(state.state)

if include_voltage:
if not self._voltage_entity_id:
self._voltage_entity_id = self.find_voltage_entity()
if not self._voltage_entity_id:
raise UnsupportedFeatureError("No matching voltage entity found.")

voltage_state = self.client.get_state(entity_id=self._voltage_entity_id)
if voltage_state == "unavailable":
raise PowerMeterError(f"Voltage sensor {self._voltage_entity_id} unavailable")

voltage_value = float(voltage_state.state)
return ExtendedPowerMeasurementResult(power_value, voltage_value, last_updated)

return PowerMeasurementResult(power_value, last_updated)

def find_voltage_entity(self) -> str | None:
"""Try to find a matching voltage entity for the current power entity."""
matched_sensors = self.match_power_and_voltage_sensors()
return matched_sensors.get(self._entity_id)

def get_questions(self) -> list[inquirer.questions.Question]:
power_sensor_list = self.get_power_sensors()
Expand All @@ -52,5 +74,26 @@ def get_power_sensors(self) -> list[str]:
power_sensors = [entity.entity_id for entity in sensors if entity.state.attributes.get("unit_of_measurement") == "W"]
return sorted(power_sensors)

def get_voltage_sensors(self) -> list[str]:
entities = self.client.get_entities()
sensors = entities["sensor"].entities.values()
voltage_sensors = [entity.entity_id for entity in sensors if entity.state.attributes.get("unit_of_measurement") == "V"]
return sorted(voltage_sensors)

def match_power_and_voltage_sensors(self) -> dict[str, str]:
power_sensors = self.get_power_sensors()
voltage_sensors = self.get_voltage_sensors()

# Create mappings based on base names
power_map = {sensor.rsplit("_power", 1)[0]: sensor for sensor in power_sensors}
voltage_map = {sensor.rsplit("_voltage", 1)[0]: sensor for sensor in voltage_sensors}

matched_sensors = {}
for base_name, power_sensor in power_map.items():
if base_name in voltage_map:
matched_sensors[power_sensor] = voltage_map[base_name]

return matched_sensors

def process_answers(self, answers: dict[str, Any]) -> None:
self._entity_id = answers[QUESTION_POWERMETER_ENTITY_ID]
10 changes: 8 additions & 2 deletions utils/measure/measure/powermeter/kasa.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@

from kasa import SmartPlug

from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter
from measure.powermeter.errors import UnsupportedFeatureError
from measure.powermeter.powermeter import ExtendedPowerMeasurementResult, PowerMeasurementResult, PowerMeter


class KasaPowerMeter(PowerMeter):
def __init__(self, device_ip: str) -> None:
self._smartplug = SmartPlug(device_ip)

def get_power(self) -> PowerMeasurementResult:
def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult:
"""Get a new power reading from the Kasa device. Optionally include voltage (FIXME: not yet implemented)."""
if include_voltage:
# FIXME: Not yet implemented
raise UnsupportedFeatureError("Voltage measurement is not yet implemented for Kasa devices.")

loop = asyncio.get_event_loop()
power = loop.run_until_complete(self.async_read_power_meter())
return PowerMeasurementResult(power, time.time())
Expand Down
17 changes: 14 additions & 3 deletions utils/measure/measure/powermeter/manual.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,23 @@
import time
from typing import Any

from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter
from measure.powermeter.powermeter import ExtendedPowerMeasurementResult, PowerMeasurementResult, PowerMeter


class ManualPowerMeter(PowerMeter):
def get_power(self) -> PowerMeasurementResult:
power = input("Input power measurement:")
def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult:
"""Manually enter power readings. Optionally enter voltage readings as well."""
if include_voltage:
power = input("Input power measurement: ")
voltage = input("Input voltage measurement: ")

return ExtendedPowerMeasurementResult(
power=float(power),
voltage=float(voltage),
updated=time.time(),
)

power = input("Input power measurement: ")
return PowerMeasurementResult(float(power), time.time())

def process_answers(self, answers: dict[str, Any]) -> None:
Expand Down
11 changes: 8 additions & 3 deletions utils/measure/measure/powermeter/mystrom.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@

import requests

from measure.powermeter.errors import PowerMeterError
from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter
from measure.powermeter.errors import PowerMeterError, UnsupportedFeatureError
from measure.powermeter.powermeter import ExtendedPowerMeasurementResult, PowerMeasurementResult, PowerMeter


class MyStromPowerMeter(PowerMeter):
def __init__(self, device_ip: str) -> None:
self._device_ip = device_ip

def get_power(self) -> PowerMeasurementResult:
def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult:
"""Get a new power reading from the MyStrom device. Optionally include voltage (FIXME: not yet implemented)."""
if include_voltage:
# FIXME: Not yet implemented
raise UnsupportedFeatureError("Voltage measurement is not yet implemented for MyStrom devices.")

r = requests.get(
f"http://{self._device_ip}/report",
timeout=10,
Expand Down
9 changes: 7 additions & 2 deletions utils/measure/measure/powermeter/ocr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import os
from typing import Any

from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter
from measure.powermeter.errors import UnsupportedFeatureError
from measure.powermeter.powermeter import ExtendedPowerMeasurementResult, PowerMeasurementResult, PowerMeter


class OcrPowerMeter(PowerMeter):
Expand All @@ -16,7 +17,11 @@ def __init__(self) -> None:
self.file = open(filepath, "rb") # noqa: SIM115
super().__init__()

def get_power(self) -> PowerMeasurementResult:
def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult:
"""Get a new power reading via OCR."""
if include_voltage:
raise UnsupportedFeatureError("Voltage measurement are not supported for OCR mode.")

last_line = self.read_last_line()
(timestamp, power) = last_line.strip().split(";")
power = float(power)
Expand Down
10 changes: 8 additions & 2 deletions utils/measure/measure/powermeter/powermeter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@

class PowerMeter(ABC):
@abstractmethod
def get_power(self) -> PowerMeasurementResult:
"""Get a power measurement from the meter"""
def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult:
"""Get a power measurement from the meter. Optionally include voltage readings."""

def get_questions(self) -> list[Question]:
"""Get questions to ask for the chosen powermeter"""
Expand All @@ -23,3 +23,9 @@ def process_answers(self, answers: dict[str, Any]) -> None:
class PowerMeasurementResult(NamedTuple):
power: float
updated: float


class ExtendedPowerMeasurementResult(NamedTuple):
power: float
voltage: float
updated: float
22 changes: 18 additions & 4 deletions utils/measure/measure/powermeter/shelly.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import requests

from measure.powermeter.errors import ApiConnectionError
from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter
from measure.powermeter.errors import ApiConnectionError, UnsupportedFeatureError
from measure.powermeter.powermeter import ExtendedPowerMeasurementResult, PowerMeasurementResult, PowerMeter

_LOGGER = logging.getLogger("measure")

Expand Down Expand Up @@ -76,6 +76,13 @@ def _check_endpoint_availability(self, endpoint: str) -> bool:

return True

def parse_json_with_voltage(self, json: dict) -> ExtendedPowerMeasurementResult:

Check notice on line 79 in utils/measure/measure/powermeter/shelly.py

View workflow job for this annotation

GitHub Actions / qodana

Method is not declared static

Method `parse_json_with_voltage` may be 'static'
return ExtendedPowerMeasurementResult(
power=float(json["apower"]),
voltage=float(json["voltage"]),
updated=time.time(),
)


class ShellyPowerMeter(PowerMeter):
def __init__(self, shelly_ip: str, timeout: int = 5) -> None:
Expand All @@ -88,8 +95,8 @@ def __init__(self, shelly_ip: str, timeout: int = 5) -> None:
self.api = ShellyApiGen2Plus(self.ip_address, self.timeout)
self.api.check_gen2_plus_endpoints()

def get_power(self) -> PowerMeasurementResult:
"""Get a new power reading from the Shelly device"""
def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult:
"""Get a new power reading from the Shelly device. Optionally include voltage."""
try:
r = requests.get(
f"http://{self.ip_address}{self.api.endpoint}",
Expand All @@ -100,6 +107,13 @@ def get_power(self) -> PowerMeasurementResult:
raise ApiConnectionError("Could not connect to Shelly Plug") from e

json = r.json()

if include_voltage:
if isinstance(self.api, ShellyApiGen1):
raise UnsupportedFeatureError("Voltage measurement is not supported on Shelly Gen1 devices")
if isinstance(self.api, ShellyApiGen2Plus):
return self.api.parse_json_with_voltage(json)

return self.api.parse_json(json)

def _detect_api_version(self) -> int:
Expand Down
11 changes: 8 additions & 3 deletions utils/measure/measure/powermeter/tasmota.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,20 @@

import requests

from measure.powermeter.errors import PowerMeterError
from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter
from measure.powermeter.errors import PowerMeterError, UnsupportedFeatureError
from measure.powermeter.powermeter import ExtendedPowerMeasurementResult, PowerMeasurementResult, PowerMeter


class TasmotaPowerMeter(PowerMeter):
def __init__(self, device_ip: str) -> None:
self._device_ip = device_ip

def get_power(self) -> PowerMeasurementResult:
def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult:
"""Get a new power reading from the Tasmota device. Optionally include voltage (FIXME: not yet implemented)."""
if include_voltage:
# FIXME: Not yet implemented
raise UnsupportedFeatureError("Voltage measurement is not yet implemented for Tasmota devices.")

r = requests.get(
f"http://{self._device_ip}/cm?cmnd=STATUS+8",
timeout=10,
Expand Down
11 changes: 8 additions & 3 deletions utils/measure/measure/powermeter/tuya.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

import tuyapower

from measure.powermeter.errors import PowerMeterError
from measure.powermeter.powermeter import PowerMeasurementResult, PowerMeter
from measure.powermeter.errors import PowerMeterError, UnsupportedFeatureError
from measure.powermeter.powermeter import ExtendedPowerMeasurementResult, PowerMeasurementResult, PowerMeter

STATUS_OK = "OK"

Expand All @@ -24,7 +24,12 @@ def __init__(
self._device_key = device_key
self._device_version = device_version

def get_power(self) -> PowerMeasurementResult:
def get_power(self, include_voltage: bool = False) -> PowerMeasurementResult | ExtendedPowerMeasurementResult:
"""Get a new power reading from the Tuya device. Optionally include voltage (FIXME: not yet implemented)."""
if include_voltage:
# FIXME: Not yet implemented
raise UnsupportedFeatureError("Voltage measurement is not yet implemented for Tuya devices.")

(_, w, _, _, err) = tuyapower.deviceInfo(
self._device_id,
self._device_ip,
Expand Down
Loading

0 comments on commit 2edbbc2

Please sign in to comment.