From 7d3f1e25006d240594269faa00e90030fe60ab22 Mon Sep 17 00:00:00 2001 From: Bram Date: Thu, 2 Jan 2025 11:12:46 +0100 Subject: [PATCH 1/2] feat: implement auto populate multi switch entities --- custom_components/powercalc/config_flow.py | 114 ++++++++++----- custom_components/powercalc/sensors/power.py | 4 +- .../library/device-types/smart-switch.md | 1 + profile_library/tp-link/HS300/model.json | 2 + .../test_virtual_power_multi_switch.py | 137 ++++++++++++------ .../profiles/tp-link/HS300/model.json | 4 +- .../multi_switch/model.json | 4 +- 7 files changed, 180 insertions(+), 86 deletions(-) diff --git a/custom_components/powercalc/config_flow.py b/custom_components/powercalc/config_flow.py index b4f8d1d27..41f9a97dd 100644 --- a/custom_components/powercalc/config_flow.py +++ b/custom_components/powercalc/config_flow.py @@ -32,6 +32,7 @@ ) from homeassistant.core import callback from homeassistant.data_entry_flow import FlowResult +from homeassistant.helpers import entity_registry as er from homeassistant.helpers import selector from homeassistant.helpers.schema_config_entry_flow import SchemaFlowError from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType @@ -198,7 +199,7 @@ class Step(StrEnum): LIBRARY_URL = "https://library.powercalc.nl" UNIQUE_ID_TRACKED_UNTRACKED = "pc_tracked_untracked" -STRATEGY_STEP_MAPPING = { +STRATEGY_STEP_MAPPING: dict[CalculationStrategy, Step] = { CalculationStrategy.FIXED: Step.FIXED, CalculationStrategy.LINEAR: Step.LINEAR, CalculationStrategy.MULTI_SWITCH: Step.MULTI_SWITCH, @@ -206,7 +207,7 @@ class Step(StrEnum): CalculationStrategy.WLED: Step.WLED, } -GROUP_STEP_MAPPING = { +GROUP_STEP_MAPPING: dict[GroupType, Step] = { GroupType.CUSTOM: Step.GROUP_CUSTOM, GroupType.DOMAIN: Step.GROUP_DOMAIN, GroupType.STANDBY: Step.GROUP_DOMAIN, @@ -366,16 +367,8 @@ class Step(StrEnum): }, ) -SCHEMA_POWER_MULTI_SWITCH = vol.Schema( - { - vol.Required(CONF_ENTITIES): selector.EntitySelector( - selector.EntitySelectorConfig(domain=Platform.SWITCH, multiple=True), - ), - }, -) SCHEMA_POWER_MULTI_SWITCH_MANUAL = vol.Schema( { - **SCHEMA_POWER_MULTI_SWITCH.schema, vol.Required(CONF_POWER): vol.Coerce(float), vol.Required(CONF_POWER_OFF): vol.Coerce(float), }, @@ -552,13 +545,19 @@ class Step(StrEnum): }, ) -GROUP_SCHEMAS = { +GROUP_SCHEMAS: dict[GroupType, vol.Schema] = { GroupType.CUSTOM: SCHEMA_GROUP, GroupType.DOMAIN: SCHEMA_GROUP_DOMAIN, GroupType.SUBTRACT: SCHEMA_GROUP_SUBTRACT, GroupType.TRACKED_UNTRACKED: SCHEMA_GROUP_TRACKED_UNTRACKED, } +STRATEGY_SCHEMAS: dict[CalculationStrategy, vol.Schema] = { + CalculationStrategy.FIXED: SCHEMA_POWER_FIXED, + CalculationStrategy.PLAYBOOK: SCHEMA_POWER_PLAYBOOK, + CalculationStrategy.WLED: SCHEMA_POWER_WLED, +} + @dataclass(slots=True) class PowercalcFormStep: @@ -590,6 +589,7 @@ def __init__(self) -> None: self.is_library_flow: bool = False self.skip_advanced_step: bool = False self.is_options_flow: bool = isinstance(self, OptionsFlow) + self.strategy: CalculationStrategy | None = None self.name: str | None = None super().__init__() @@ -625,26 +625,72 @@ def validate_group_input(user_input: dict[str, Any] | None = None) -> None: if not any(key in (user_input or {}) for key in required_keys): raise SchemaFlowError("group_mandatory") - def create_strategy_schema(self, strategy: CalculationStrategy, source_entity_id: str) -> vol.Schema: + def create_strategy_schema(self) -> vol.Schema: """Get the config schema for a given power calculation strategy.""" - if strategy == CalculationStrategy.LINEAR: - return SCHEMA_POWER_LINEAR.extend( # type: ignore + if not self.strategy: + raise ValueError("No strategy selected") # pragma: no cover + + if hasattr(self, f"create_schema_{self.strategy.value.lower()}"): + return getattr(self, f"create_schema_{self.strategy.value.lower()}")() # type: ignore + + return STRATEGY_SCHEMAS[self.strategy] + + def create_schema_linear(self) -> vol.Schema: + """Create the config schema for linear strategy.""" + return SCHEMA_POWER_LINEAR.extend( # type: ignore + { + vol.Optional(CONF_ATTRIBUTE): selector.AttributeSelector( + selector.AttributeSelectorConfig( + entity_id=self.source_entity_id, # type: ignore + hide_attributes=[], + ), + ), + }, + ) + + def create_schema_multi_switch(self) -> vol.Schema: + """Create the config schema for multi switch strategy.""" + + entity_registry = er.async_get(self.hass) + entities: list[str] = [] + if self.source_entity and self.source_entity.device_entry: + entities = [ + entity.entity_id + for entity in entity_registry.entities.get_entries_for_device_id(self.source_entity.device_entry.id) + if entity.domain == Platform.SWITCH + ] + + # pre-populate the entity selector with the switches from the device + if entities: + schema = vol.Schema( + { + vol.Required( + CONF_ENTITIES, + description={"suggested_value": entities}, + ): selector.EntitySelector( + selector.EntitySelectorConfig( + domain=Platform.SWITCH, + multiple=True, + include_entities=entities, + ), + ), + }, + ) + else: + schema = vol.Schema( { - vol.Optional(CONF_ATTRIBUTE): selector.AttributeSelector( - selector.AttributeSelectorConfig( - entity_id=source_entity_id, - hide_attributes=[], + vol.Required(CONF_ENTITIES): selector.EntitySelector( + selector.EntitySelectorConfig( + domain=Platform.SWITCH, + multiple=True, ), ), }, ) - if strategy == CalculationStrategy.PLAYBOOK: - return SCHEMA_POWER_PLAYBOOK - if strategy == CalculationStrategy.MULTI_SWITCH: - return SCHEMA_POWER_MULTI_SWITCH if self.is_library_flow else SCHEMA_POWER_MULTI_SWITCH_MANUAL - if strategy == CalculationStrategy.WLED: - return SCHEMA_POWER_WLED - return SCHEMA_POWER_FIXED + if not self.is_library_flow: + schema = schema.extend(SCHEMA_POWER_MULTI_SWITCH_MANUAL.schema) + + return schema def create_schema_group_custom( self, @@ -772,12 +818,10 @@ def create_group_selector( def build_strategy_config( self, - strategy: CalculationStrategy, - source_entity_id: str, user_input: dict[str, Any], ) -> dict[str, Any]: """Build the config dict needed for the configured strategy.""" - strategy_schema = self.create_strategy_schema(strategy, source_entity_id) + strategy_schema = self.create_strategy_schema() strategy_options: dict[str, Any] = {} for key in strategy_schema.schema: if user_input.get(key) is None: @@ -1120,13 +1164,15 @@ async def handle_strategy_step( user_input: dict[str, Any] | None = None, validate: Callable[[dict[str, Any]], None] | None = None, ) -> FlowResult: + self.strategy = strategy + async def _validate(user_input: dict[str, Any]) -> dict[str, Any]: if validate: validate(user_input) await self.validate_strategy_config({strategy: user_input}) return {strategy: user_input} - schema = self.create_strategy_schema(strategy, self.source_entity_id) # type: ignore + schema = self.create_strategy_schema() return await self.handle_form_step( PowercalcFormStep( @@ -1634,7 +1680,7 @@ def __init__(self, config_entry: ConfigEntry) -> None: self.sensor_config = dict(config_entry.data) self.sensor_type: SensorType = self.sensor_config.get(CONF_SENSOR_TYPE) or SensorType.VIRTUAL_POWER self.source_entity_id: str = self.sensor_config.get(CONF_ENTITY_ID) # type: ignore - self.strategy: CalculationStrategy | None = self.sensor_config.get(CONF_MODE) + self.strategy = self.sensor_config.get(CONF_MODE) async def async_step_init( self, @@ -1856,7 +1902,7 @@ async def async_handle_strategy_options_step(self, user_input: dict[str, Any] | step = STRATEGY_STEP_MAPPING.get(self.strategy, Step.FIXED) - schema = self.create_strategy_schema(self.strategy, self.source_entity_id) + schema = self.create_strategy_schema() if self.selected_profile and self.selected_profile.device_type == DeviceType.SMART_SWITCH: schema = SCHEMA_POWER_SMART_SWITCH @@ -1905,11 +1951,7 @@ async def process_all_options(self, user_input: dict[str, Any], schema: vol.Sche self._process_user_input(user_input, SCHEMA_POWER_SMART_SWITCH) user_input = {CONF_POWER: user_input.get(CONF_POWER, 0)} - strategy_options = self.build_strategy_config( - self.strategy, - self.source_entity_id, - user_input or {}, - ) + strategy_options = self.build_strategy_config(user_input or {}) if self.strategy != CalculationStrategy.LUT: self.sensor_config.update({str(self.strategy): strategy_options}) diff --git a/custom_components/powercalc/sensors/power.py b/custom_components/powercalc/sensors/power.py index 3e8928e2e..909b684de 100644 --- a/custom_components/powercalc/sensors/power.py +++ b/custom_components/powercalc/sensors/power.py @@ -87,6 +87,7 @@ from custom_components.powercalc.helpers import evaluate_power from custom_components.powercalc.power_profile.factory import get_power_profile from custom_components.powercalc.power_profile.power_profile import ( + DiscoveryBy, PowerProfile, SubProfileSelectConfig, SubProfileSelector, @@ -505,7 +506,8 @@ async def _handle_source_entity_state_change( self._sleep_power_timer() self._sleep_power_timer = None - if self.source_entity == DUMMY_ENTITY_ID: + discovery_by = self._power_profile.discovery_by if self._power_profile else DiscoveryBy.ENTITY + if self.source_entity == DUMMY_ENTITY_ID and discovery_by == DiscoveryBy.ENTITY: state = State(self.source_entity, STATE_ON) if not state or not self._has_valid_state(state): diff --git a/docs/source/library/device-types/smart-switch.md b/docs/source/library/device-types/smart-switch.md index 64ac63bd6..11f196b84 100644 --- a/docs/source/library/device-types/smart-switch.md +++ b/docs/source/library/device-types/smart-switch.md @@ -71,6 +71,7 @@ Examples of this type of smart switch are: ```json { "calculation_strategy": "multi_switch", + "discovery_by": "device", "multi_switch_config": { "power": 0.8, "power_off": 0.25 diff --git a/profile_library/tp-link/HS300/model.json b/profile_library/tp-link/HS300/model.json index 58c8c1524..4fd1d4016 100644 --- a/profile_library/tp-link/HS300/model.json +++ b/profile_library/tp-link/HS300/model.json @@ -1,6 +1,7 @@ { "name": "Kasa Smart Wi-Fi Power Strip", "device_type": "smart_switch", + "discovery_by": "device", "calculation_strategy": "multi_switch", "measure_method": "manual", "measure_device": "Kill-a-Watt", @@ -14,5 +15,6 @@ "aliases": [ "HS300(US)" ], + "only_self_usage": true, "author": "Bram Gerritsen " } diff --git a/tests/config_flow/test_virtual_power_multi_switch.py b/tests/config_flow/test_virtual_power_multi_switch.py index 11138de06..773dc255b 100644 --- a/tests/config_flow/test_virtual_power_multi_switch.py +++ b/tests/config_flow/test_virtual_power_multi_switch.py @@ -1,15 +1,17 @@ from unittest.mock import AsyncMock +import voluptuous as vol from homeassistant import data_entry_flow from homeassistant.const import CONF_ENTITIES, CONF_ENTITY_ID, CONF_NAME, STATE_ON from homeassistant.core import HomeAssistant +from homeassistant.data_entry_flow import FlowResult from homeassistant.helpers import entity_registry as er from homeassistant.helpers.device_registry import DeviceEntry from homeassistant.helpers.entity_registry import RegistryEntry from pytest_homeassistant_custom_component.common import mock_device_registry, mock_registry from custom_components.powercalc import DiscoveryManager -from custom_components.powercalc.common import create_source_entity +from custom_components.powercalc.common import SourceEntity from custom_components.powercalc.config_flow import Step from custom_components.powercalc.const import ( CONF_MANUFACTURER, @@ -19,9 +21,12 @@ CONF_POWER, CONF_POWER_OFF, CONF_SENSOR_TYPE, + DUMMY_ENTITY_ID, CalculationStrategy, SensorType, ) +from custom_components.powercalc.power_profile.factory import get_power_profile +from custom_components.powercalc.power_profile.library import ModelInfo from tests.common import get_test_config_dir, run_powercalc_setup from tests.config_flow.common import ( DEFAULT_UNIQUE_ID, @@ -62,85 +67,64 @@ async def test_discovery_flow( mock_entity_with_model_information: MockEntityWithModel, ) -> None: hass.config.config_dir = get_test_config_dir() - manufacturer = "tp-link" - model = "HS300" - mock_entity_with_model_information( - "switch.test", - manufacturer, - model, - unique_id=DEFAULT_UNIQUE_ID, - ) + device_entry = mock_device_with_switches(hass, 2, "tp-link", "HS300") - source_entity = await create_source_entity("switch.test", hass) - result = await initialize_discovery_flow(hass, source_entity, confirm_autodiscovered_model=True) + result = await initialize_device_discovery_flow(hass, device_entry) assert result["type"] == data_entry_flow.FlowResultType.FORM assert result["step_id"] == Step.MULTI_SWITCH result = await hass.config_entries.flow.async_configure( result["flow_id"], - {CONF_ENTITIES: ["switch.a", "switch.b"]}, + {CONF_ENTITIES: ["switch.test1", "switch.test2"]}, ) assert result["type"] == data_entry_flow.FlowResultType.CREATE_ENTRY assert result["data"] == { - CONF_ENTITY_ID: "switch.test", + CONF_ENTITY_ID: DUMMY_ENTITY_ID, CONF_SENSOR_TYPE: SensorType.VIRTUAL_POWER, - CONF_MANUFACTURER: manufacturer, - CONF_MODEL: model, - CONF_NAME: "test", + CONF_MANUFACTURER: "tp-link", + CONF_MODEL: "HS300", + CONF_NAME: "Test", CONF_MULTI_SWITCH: { - CONF_ENTITIES: ["switch.a", "switch.b"], + CONF_ENTITIES: ["switch.test1", "switch.test2"], }, } assert hass.states.get("sensor.test_device_power") - hass.states.async_set("switch.a", STATE_ON) + hass.states.async_set("switch.test1", STATE_ON) await hass.async_block_till_done() assert hass.states.get("sensor.test_device_power").state == "0.82" - hass.states.async_set("switch.b", STATE_ON) + hass.states.async_set("switch.test2", STATE_ON) await hass.async_block_till_done() assert hass.states.get("sensor.test_device_power").state == "1.64" +async def test_switch_entities_automatically_populated_from_device(hass: HomeAssistant) -> None: + """When setting up multi switch, the switch entities should be automatically populated from the device.""" + device_entry = mock_device_with_switches(hass, 4, "tp-link", "HS300") + + result = await initialize_device_discovery_flow(hass, device_entry) + + assert result["type"] == data_entry_flow.FlowResultType.FORM + assert result["step_id"] == Step.MULTI_SWITCH + + schema_keys: list[vol.Marker] = list(result["data_schema"].schema.keys()) + field_desc = schema_keys[schema_keys.index(vol.Marker(CONF_ENTITIES))].description + assert field_desc == {"suggested_value": ["switch.test1", "switch.test2", "switch.test3", "switch.test4"]} + + async def test_discovery_flow_once_per_unique_device( hass: HomeAssistant, mock_flow_init: AsyncMock, ) -> None: hass.config.config_dir = get_test_config_dir() - device_id = "abcdef" - mock_device_registry( - hass, - { - device_id: DeviceEntry( - id=device_id, - manufacturer="tp-link", - model="HS300", - ), - }, - ) - - entities: dict[str, RegistryEntry] = {} - for i in range(6): - entity_id = f"switch.test{i}" - entry = RegistryEntry( - id=entity_id, - entity_id=entity_id, - unique_id=f"{device_id}{i}", - device_id=device_id, - platform="switch", - ) - entities[entity_id] = entry - - mock_registry( - hass, - entities, - ) + mock_device_with_switches(hass, 6, "tp-link", "HS300") discovery_manager = DiscoveryManager(hass, {}) await discovery_manager.start_discovery() @@ -204,3 +188,62 @@ async def test_regression_2612(hass: HomeAssistant, mock_entity_with_model_infor assert hass.states.get("sensor.foo_bar_power") assert hass.states.get("sensor.foo_bar_energy") + + +async def initialize_device_discovery_flow(hass: HomeAssistant, device_entry: DeviceEntry) -> FlowResult: + source_entity = SourceEntity( + object_id=device_entry.name, + name=device_entry.name, + entity_id=DUMMY_ENTITY_ID, + domain="sensor", + device_entry=device_entry, + ) + + power_profiles = [ + await get_power_profile( + hass, + {}, + ModelInfo(device_entry.manufacturer, device_entry.model), + ), + ] + return await initialize_discovery_flow( + hass, + source_entity, + power_profiles, + confirm_autodiscovered_model=True, + ) + + +def mock_device_with_switches(hass: HomeAssistant, num_switches: int = 2, manufacturer: str = "test", model: str = "test") -> DeviceEntry: + device_id = "abcdef" + device_entry = DeviceEntry( + id=device_id, + manufacturer=manufacturer, + model=model, + name="Test", + ) + mock_device_registry( + hass, + { + device_id: device_entry, + }, + ) + + entities: dict[str, RegistryEntry] = {} + for i in range(num_switches): + entity_id = f"switch.test{i + 1}" + entry = RegistryEntry( + id=entity_id, + entity_id=entity_id, + unique_id=f"{device_id}{i + 1}", + device_id=device_id, + platform="switch", + ) + entities[entity_id] = entry + + mock_registry( + hass, + entities, + ) + + return device_entry diff --git a/tests/testing_config/powercalc/profiles/tp-link/HS300/model.json b/tests/testing_config/powercalc/profiles/tp-link/HS300/model.json index b8f56c397..8a742a53d 100644 --- a/tests/testing_config/powercalc/profiles/tp-link/HS300/model.json +++ b/tests/testing_config/powercalc/profiles/tp-link/HS300/model.json @@ -5,6 +5,7 @@ "hs500" ], "device_type": "smart_switch", + "discovery_by": "device", "calculation_strategy": "multi_switch", "multi_switch_config": { "power": 0.82, @@ -12,5 +13,6 @@ }, "sensor_config": { "power_sensor_naming": "{} Device Power" - } + }, + "only_self_usage": true } diff --git a/tests/testing_config/powercalc_profiles/multi_switch/model.json b/tests/testing_config/powercalc_profiles/multi_switch/model.json index aa8d517b6..da1f1421b 100644 --- a/tests/testing_config/powercalc_profiles/multi_switch/model.json +++ b/tests/testing_config/powercalc_profiles/multi_switch/model.json @@ -1,5 +1,6 @@ { "name": "Kasa Smart Wi-Fi Power Strip", + "discovery_by": "device", "device_type": "smart_switch", "calculation_strategy": "multi_switch", "multi_switch_config": { @@ -8,5 +9,6 @@ }, "sensor_config": { "power_sensor_naming": "{} Device Power" - } + }, + "only_self_usage": true } From a271adee2cf4c3551fe0e2e65d342e689f9d99ab Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 2 Jan 2025 10:13:07 +0000 Subject: [PATCH 2/2] chore(lint): [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2e2524c72..171412ecd 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ # -**PowerCalc** is a versatile custom component for Home Assistant that estimates power consumption for devices like lights, fans, smart speakers, and more—especially those without built-in power meters. +**PowerCalc** is a versatile custom component for Home Assistant that estimates power consumption for devices like lights, fans, smart speakers, and more—especially those without built-in power meters. It acts as a virtual energy monitor, using advanced strategies to calculate power usage. For light entities, PowerCalc analyzes factors such as brightness, hue, saturation, and color temperature to deliver accurate consumption estimates. For other devices, it offers extensive configuration possibilities. Additionally, PowerCalc includes a powerful measurement utility, enabling users to assess their devices' power usage and contribute custom power profiles to the growing PowerCalc library.