From a7aae819928383f3754c98868c6feae18e266ece Mon Sep 17 00:00:00 2001 From: sususweet Date: Wed, 18 Dec 2024 21:52:06 +0800 Subject: [PATCH] use DataUpdateCoordinator for device state polling to reduce parallel requests to Deye official cloud server (#47) * feat: add control support for new protocol like DYD-T22A3, DYD-D50A3 * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * refactor: use `DataUpdateCoordinator` for device state polling to reduce parallel requests to Deye official cloud server * fix: improve for fan entity to avoid FanEntityFeature warnings. * feat: display device online status * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * refactor: fix imports * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .../deye_dehumidifier/__init__.py | 119 +++++------------- custom_components/deye_dehumidifier/const.py | 1 + .../deye_dehumidifier/data_coordinator.py | 106 ++++++++++++++++ custom_components/deye_dehumidifier/fan.py | 23 ++-- .../deye_dehumidifier/humidifier.py | 71 +++++++++-- custom_components/deye_dehumidifier/switch.py | 12 +- 6 files changed, 226 insertions(+), 106 deletions(-) create mode 100644 custom_components/deye_dehumidifier/data_coordinator.py diff --git a/custom_components/deye_dehumidifier/__init__.py b/custom_components/deye_dehumidifier/__init__.py index a05e146..301d098 100644 --- a/custom_components/deye_dehumidifier/__init__.py +++ b/custom_components/deye_dehumidifier/__init__.py @@ -2,22 +2,19 @@ from __future__ import annotations -from datetime import datetime - from homeassistant.config_entries import ConfigEntry from homeassistant.const import Platform -from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback +from homeassistant.core import HomeAssistant, callback from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady from homeassistant.helpers.aiohttp_client import async_get_clientsession from homeassistant.helpers.entity import DeviceInfo, Entity -from homeassistant.helpers.event import async_call_later +from homeassistant.helpers.update_coordinator import CoordinatorEntity from libdeye.cloud_api import ( DeyeCloudApi, DeyeCloudApiCannotConnectError, DeyeCloudApiInvalidAuthError, ) -from libdeye.const import QUERY_DEVICE_STATE_COMMAND -from libdeye.device_state_command import DeyeDeviceCommand, DeyeDeviceState +from libdeye.device_state_command import DeyeDeviceState from libdeye.mqtt_client import DeyeMqttClient from libdeye.types import DeyeApiResponseDeviceInfo @@ -26,11 +23,13 @@ CONF_PASSWORD, CONF_USERNAME, DATA_CLOUD_API, + DATA_COORDINATOR, DATA_DEVICE_LIST, DATA_MQTT_CLIENT, DOMAIN, MANUFACTURER, ) +from .data_coordinator import DeyeDataUpdateCoordinator PLATFORMS: list[Platform] = [ Platform.HUMIDIFIER, @@ -72,6 +71,13 @@ def on_auth_token_refreshed(auth_token: str) -> None: await cloud_api.get_device_list(), ) ) + for device in device_list: + coordinator = DeyeDataUpdateCoordinator( + hass, device, mqtt_client, cloud_api + ) + device[DATA_COORDINATOR] = coordinator + await device[DATA_COORDINATOR].async_config_entry_first_refresh() + except DeyeCloudApiInvalidAuthError as err: raise ConfigEntryAuthFailed from err except DeyeCloudApiCannotConnectError as err: @@ -99,7 +105,7 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool: return unload_ok -class DeyeEntity(Entity): +class DeyeEntity(CoordinatorEntity, Entity): """Initiate Deye Base Class.""" def __init__( @@ -109,11 +115,13 @@ def __init__( cloud_api: DeyeCloudApi, ) -> None: """Initialize the instance.""" + self.coordinator = device[DATA_COORDINATOR] + super().__init__(self.coordinator) self._device = device self._mqtt_client = mqtt_client self._cloud_api = cloud_api self._attr_has_entity_name = True - self._attr_available = self._device["online"] + self._device_available = self._device["online"] self._attr_unique_id = self._device["mac"] self.entity_id_base = f'deye_{self._device["mac"].lower()}' # We will override HA generated entity ID self._attr_device_info = DeviceInfo( @@ -123,7 +131,6 @@ def __init__( name=self._device["device_name"], ) self._attr_should_poll = False - self.subscription_muted: CALLBACK_TYPE | None = None # payload from the server sometimes are not a valid string if isinstance(self._device["payload"], str): self.device_state = DeyeDeviceState(self._device["payload"]) @@ -131,84 +138,26 @@ def __init__( self.device_state = DeyeDeviceState( "1411000000370000000000000000003C3C0000000000" # 20°C/60%RH as the default state ) + remove_handle = self.coordinator.async_add_listener( + self._handle_coordinator_update + ) + self.async_on_remove(remove_handle) - def update_device_availability(self, available: bool) -> None: - """Will be called when received new availability status.""" - if self.subscription_muted: - return - self._attr_available = available - self.async_write_ha_state() - - def update_device_state(self, state: DeyeDeviceState) -> None: - """Will be called when received new DeyeDeviceState.""" - if self.subscription_muted: - return - self.device_state = state + async def publish_command_async(self, attribute, value): + """Push command to a queue and deal with them together.""" self.async_write_ha_state() + self.hass.bus.fire( + "call_humidifier_method", {"prop": attribute, "value": value} + ) + await self.coordinator.async_request_refresh() - async def async_added_to_hass(self) -> None: - """When entity is added to Home Assistant.""" - if self._device["platform"] == 1: - self.async_on_remove( - self._mqtt_client.subscribe_availability_change( - self._device["product_id"], - self._device["device_id"], - self.update_device_availability, - ) - ) - self.async_on_remove( - self._mqtt_client.subscribe_state_change( - self._device["product_id"], - self._device["device_id"], - self.update_device_state, - ) - ) - - await self.poll_device_state() - self.async_on_remove(self.cancel_polling) + @property + def available(self): + return self._device_available @callback - async def poll_device_state(self, now: datetime | None = None) -> None: - """ - Some Deye devices have a very long heartbeat period. So polling is still necessary to get the latest state as - quickly as possible. - """ - if self._device["platform"] == 1: - self._mqtt_client.publish_command( - self._device["product_id"], - self._device["device_id"], - QUERY_DEVICE_STATE_COMMAND, - ) - elif self._device["platform"] == 2: - state = DeyeDeviceState( - await self._cloud_api.get_fog_platform_device_properties( - self._device["device_id"] - ) - ) - self.update_device_state(state) - self.cancel_polling = async_call_later(self.hass, 10, self.poll_device_state) - - def mute_subscription_for_a_while(self) -> None: - """Mute subscription for a while to avoid state bouncing.""" - if self.subscription_muted: - self.subscription_muted() - - @callback - def unmute(now: datetime) -> None: - self.subscription_muted = None - - self.subscription_muted = async_call_later(self.hass, 10, unmute) - - async def publish_command(self, command: DeyeDeviceCommand) -> None: - if self._device["platform"] == 1: - """Publish a MQTT command to this device.""" - self._mqtt_client.publish_command( - self._device["product_id"], self._device["device_id"], command.bytes() - ) - elif self._device["platform"] == 2: - """Publish a MQTT command to this device.""" - await self._cloud_api.set_fog_platform_device_properties( - self._device["device_id"], command.json() - ) - self.async_write_ha_state() - self.mute_subscription_for_a_while() + def _handle_coordinator_update(self) -> None: + """Handle updated data from the coordinator.""" + self.device_state = self.coordinator.data + self._device_available = self.coordinator.device_available + super()._handle_coordinator_update() diff --git a/custom_components/deye_dehumidifier/const.py b/custom_components/deye_dehumidifier/const.py index 1914dd0..bdccf93 100644 --- a/custom_components/deye_dehumidifier/const.py +++ b/custom_components/deye_dehumidifier/const.py @@ -7,4 +7,5 @@ DATA_CLOUD_API = "cloud_api" DATA_MQTT_CLIENT = "mqtt_client" DATA_DEVICE_LIST = "device_list" +DATA_COORDINATOR = "coordinator" MANUFACTURER = "Ningbo Deye Technology Co., Ltd" diff --git a/custom_components/deye_dehumidifier/data_coordinator.py b/custom_components/deye_dehumidifier/data_coordinator.py new file mode 100644 index 0000000..c592252 --- /dev/null +++ b/custom_components/deye_dehumidifier/data_coordinator.py @@ -0,0 +1,106 @@ +import asyncio +import logging +from datetime import datetime, timedelta + +from homeassistant.core import CALLBACK_TYPE, callback +from homeassistant.helpers.event import async_call_later +from homeassistant.helpers.update_coordinator import DataUpdateCoordinator +from libdeye.const import QUERY_DEVICE_STATE_COMMAND +from libdeye.device_state_command import DeyeDeviceState + +_LOGGER = logging.getLogger(__name__) + + +class DeyeDataUpdateCoordinator(DataUpdateCoordinator): + def __init__(self, hass, device, mqtt_client, cloud_api): + super().__init__( + hass, + _LOGGER, + name="deye_data_update_coordinator", + update_method=self.poll_device_state, + update_interval=timedelta(seconds=10), + ) + self._mqtt_client = mqtt_client + self._cloud_api = cloud_api + self.subscription_muted: CALLBACK_TYPE | None = None + + self.data = DeyeDeviceState( + "1411000000370000000000000000003C3C0000000000" # 20°C/60%RH as the default state + ) + self._device = device + self.device_available = self._device["online"] + """When entity is added to Home Assistant.""" + if self._device["platform"] == 1: + # self._mqtt_client.subscribe_availability_change( + # self._device["product_id"], + # self._device["device_id"], + # self.update_device_availability, + # ) + self._mqtt_client.subscribe_state_change( + self._device["product_id"], + self._device["device_id"], + self.update_device_state, + ) + + self.receive_queue = asyncio.Queue() + self.device_available_queue = asyncio.Queue() + + def mute_subscription_for_a_while(self) -> None: + """Mute subscription for a while to avoid state bouncing.""" + if self.subscription_muted: + self.subscription_muted() + + @callback + def unmute(now: datetime) -> None: + self.subscription_muted = None + + self.subscription_muted = async_call_later(self.hass, 20, unmute) + + def update_device_state(self, state: DeyeDeviceState) -> None: + """Will be called when received new DeyeDeviceState.""" + self.receive_queue.put_nowait(state) + # self.async_set_updated_data(state) + + async def async_request_refresh(self) -> None: + self.mute_subscription_for_a_while() + await super().async_request_refresh() + + async def poll_device_state(self) -> DeyeDeviceState: + """ + Some Deye devices have a very long heartbeat period. So polling is still necessary to get the latest state as + quickly as possible. + """ + # _LOGGER.error("poll_device_state called: " + str(self._device["product_id"])) + if self.subscription_muted: + return self.data + + device_list = list( + filter( + lambda d: d["product_type"] == "dehumidifier" + and d["device_id"] == self._device["device_id"], + await self._cloud_api.get_device_list(), + ) + ) + if len(device_list) > 0: + device = device_list[0] + self.device_available = device["online"] + + if self._device["platform"] == 1: + self._mqtt_client.publish_command( + self._device["product_id"], + self._device["device_id"], + QUERY_DEVICE_STATE_COMMAND, + ) + response = await asyncio.wait_for( + self.receive_queue.get(), timeout=10 + ) # 设置超时时间 + # _LOGGER.error(response.to_command().json()) + return response + elif self._device["platform"] == 2: + response = DeyeDeviceState( + await self._cloud_api.get_fog_platform_device_properties( + self._device["device_id"] + ) + ) + # _LOGGER.error(response.to_command().json()) + return response diff --git a/custom_components/deye_dehumidifier/fan.py b/custom_components/deye_dehumidifier/fan.py index 5b17c6b..d8cc579 100644 --- a/custom_components/deye_dehumidifier/fan.py +++ b/custom_components/deye_dehumidifier/fan.py @@ -55,6 +55,10 @@ def __init__( self.entity_id = f"fan.{self.entity_id_base}_fan" feature_config = get_product_feature_config(device["product_id"]) self._attr_supported_features = FanEntityFeature.SET_SPEED + if hasattr(FanEntityFeature, "TURN_ON"): # v2024.8 + self._attr_supported_features |= FanEntityFeature.TURN_ON + if hasattr(FanEntityFeature, "TURN_OFF"): + self._attr_supported_features |= FanEntityFeature.TURN_OFF if feature_config["oscillating"]: self._attr_supported_features |= FanEntityFeature.OSCILLATE self._named_fan_speeds = feature_config["fan_speed"] @@ -82,16 +86,17 @@ def percentage(self) -> int: async def async_oscillate(self, oscillating: bool) -> None: """Oscillate the fan.""" self.device_state.oscillating_switch = oscillating - await self.publish_command(self.device_state.to_command()) + await self.publish_command_async("oscillating_switch", oscillating) async def async_set_percentage(self, percentage: int) -> None: """Set the speed of the fan, as a percentage.""" if percentage == 0: await self.async_turn_off() - self.device_state.fan_speed = percentage_to_ordered_list_item( - self._named_fan_speeds, percentage + fan_speed = int( + percentage_to_ordered_list_item(self._named_fan_speeds, percentage) ) - await self.publish_command(self.device_state.to_command()) + self.device_state.fan_speed = fan_speed + await self.publish_command_async("fan_speed", fan_speed) async def async_turn_on( self, @@ -101,13 +106,15 @@ async def async_turn_on( ) -> None: """Turn on the fan.""" self.device_state.power_switch = True + await self.publish_command_async("power_switch", True) if percentage is not None: - self.device_state.fan_speed = percentage_to_ordered_list_item( - self._named_fan_speeds, percentage + fan_speed = int( + percentage_to_ordered_list_item(self._named_fan_speeds, percentage) ) - await self.publish_command(self.device_state.to_command()) + self.device_state.fan_speed = fan_speed + await self.publish_command_async("fan_speed", fan_speed) async def async_turn_off(self, **kwargs: Any) -> None: """Turn the entity off.""" self.device_state.power_switch = False - await self.publish_command(self.device_state.to_command()) + await self.publish_command_async("power_switch", False) diff --git a/custom_components/deye_dehumidifier/humidifier.py b/custom_components/deye_dehumidifier/humidifier.py index 3f25321..7dcea50 100644 --- a/custom_components/deye_dehumidifier/humidifier.py +++ b/custom_components/deye_dehumidifier/humidifier.py @@ -2,6 +2,7 @@ from __future__ import annotations +from datetime import datetime, timedelta from typing import Any from homeassistant.components.humidifier import ( @@ -11,9 +12,10 @@ ) from homeassistant.components.humidifier.const import MODE_AUTO, MODE_SLEEP from homeassistant.config_entries import ConfigEntry -from homeassistant.core import HomeAssistant +from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback from homeassistant.helpers.entity_platform import AddEntitiesCallback from libdeye.cloud_api import DeyeCloudApi +from libdeye.device_state_command import DeyeDeviceState from libdeye.mqtt_client import DeyeMqttClient from libdeye.types import DeyeApiResponseDeviceInfo, DeyeDeviceMode from libdeye.utils import get_product_feature_config @@ -35,9 +37,17 @@ async def async_setup_entry( data = hass.data[DOMAIN][config_entry.entry_id] for device in data[DATA_DEVICE_LIST]: - async_add_entities( - [DeyeDehumidifier(device, data[DATA_MQTT_CLIENT], data[DATA_CLOUD_API])] + deye_dehumidifier = DeyeDehumidifier( + device, data[DATA_MQTT_CLIENT], data[DATA_CLOUD_API] ) + async_add_entities([deye_dehumidifier]) + + async def call_method(event): + prop = event.data.get("prop") + value = event.data.get("value") + await deye_dehumidifier.publish_command(prop, value) + + hass.bus.async_listen("call_humidifier_method", call_method) class DeyeDehumidifier(DeyeEntity, HumidifierEntity): @@ -56,6 +66,7 @@ def __init__( """Initialize the humidifier entity.""" super().__init__(device, mqtt_client, cloud_api) assert self._attr_unique_id is not None + self.subscription_muted: CALLBACK_TYPE | None = None self._attr_unique_id += "-dehumidifier" self.entity_id = f"humidifier.{self.entity_id_base}_dehumidifier" feature_config = get_product_feature_config(device["product_id"]) @@ -67,6 +78,43 @@ def __init__( self._attr_min_humidity = feature_config["min_target_humidity"] self._attr_max_humidity = feature_config["max_target_humidity"] self._attr_entity_picture = device["product_icon"] + self.data_change_list: dict = dict() + + async def async_added_to_hass(self) -> None: + await super().async_added_to_hass() + self.hass.helpers.event.async_track_time_interval( + self.put_device_state, timedelta(seconds=5) + ) + + @callback + async def put_device_state(self, now: datetime | None = None) -> None: + # _LOGGER.error(self.data_change_list) + if len(self.data_change_list.items()) > 0: + command = self.device_state.to_command() + for prop, value in self.data_change_list.items(): + set_class_variable(command, prop, value) + self.data_change_list.clear() + if self._device["platform"] == 1: + """Publish a MQTT command to this device.""" + self._mqtt_client.publish_command( + self._device["product_id"], + self._device["device_id"], + command.bytes(), + ) + elif self._device["platform"] == 2: + """Post a Remote command to this device.""" + await self._cloud_api.set_fog_platform_device_properties( + self._device["device_id"], command.json() + ) + + self.async_write_ha_state() + + async def publish_command(self, prop, value) -> None: + self.data_change_list[prop] = value + + @property + def get_device_state(self) -> DeyeDeviceState: + return self.device_state @property def target_humidity(self) -> int: @@ -108,22 +156,31 @@ def action(self) -> str: async def async_set_mode(self, mode: str) -> None: """Set new working mode.""" self.device_state.mode = hass_mode_to_deye_mode(mode) - await self.publish_command(self.device_state.to_command()) + await self.publish_command_async("mode", hass_mode_to_deye_mode(mode)) async def async_set_humidity(self, humidity: int) -> None: """Set new target humidity.""" self.device_state.target_humidity = humidity - await self.publish_command(self.device_state.to_command()) + await self.publish_command_async("target_humidity", humidity) async def async_turn_on(self, **kwargs: Any) -> None: """Turn the device on.""" self.device_state.power_switch = True - await self.publish_command(self.device_state.to_command()) + await self.publish_command_async("power_switch", True) async def async_turn_off(self, **kwargs: Any) -> None: """Turn the device off.""" self.device_state.power_switch = False - await self.publish_command(self.device_state.to_command()) + await self.publish_command_async("power_switch", False) + + +def set_class_variable(obj, var_name, new_value): + if hasattr(obj, var_name): + setattr(obj, var_name, new_value) + else: + raise AttributeError( + f"'{obj.__class__.__name__}' object has no attribute '{var_name}'" + ) def deye_mode_to_hass_mode(mode: DeyeDeviceMode) -> str: diff --git a/custom_components/deye_dehumidifier/switch.py b/custom_components/deye_dehumidifier/switch.py index 20d5090..4501ca7 100644 --- a/custom_components/deye_dehumidifier/switch.py +++ b/custom_components/deye_dehumidifier/switch.py @@ -72,12 +72,12 @@ def is_on(self) -> bool: async def async_turn_on(self, **kwargs: Any) -> None: """Turn the child lock on.""" self.device_state.child_lock_switch = True - await self.publish_command(self.device_state.to_command()) + await self.publish_command_async("child_lock_switch", True) async def async_turn_off(self, **kwargs: Any) -> None: """Turn the child lock off.""" self.device_state.child_lock_switch = False - await self.publish_command(self.device_state.to_command()) + await self.publish_command_async("child_lock_switch", False) class DeyeAnionSwitch(DeyeEntity, SwitchEntity): @@ -107,12 +107,12 @@ def is_on(self) -> bool: async def async_turn_on(self, **kwargs: Any) -> None: """Turn the anion switch on.""" self.device_state.anion_switch = True - await self.publish_command(self.device_state.to_command()) + await self.publish_command_async("anion_switch", True) async def async_turn_off(self, **kwargs: Any) -> None: """Turn the anion switch off.""" self.device_state.anion_switch = False - await self.publish_command(self.device_state.to_command()) + await self.publish_command_async("anion_switch", False) class DeyeWaterPumpSwitch(DeyeEntity, SwitchEntity): @@ -142,9 +142,9 @@ def is_on(self) -> bool: async def async_turn_on(self, **kwargs: Any) -> None: """Turn the water pump on.""" self.device_state.water_pump_switch = True - await self.publish_command(self.device_state.to_command()) + await self.publish_command_async("water_pump_switch", True) async def async_turn_off(self, **kwargs: Any) -> None: """Turn the water pump off.""" self.device_state.water_pump_switch = False - await self.publish_command(self.device_state.to_command()) + await self.publish_command_async("water_pump_switch", False)