Skip to content

Commit

Permalink
use DataUpdateCoordinator for device state polling to reduce parallel…
Browse files Browse the repository at this point in the history
… requests to Deye official cloud server (#47)

* feat: add control support for new protocol like DYD-T22A3, DYD-D50A3

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* refactor: use `DataUpdateCoordinator` for device state polling to reduce parallel requests to Deye official cloud server

* fix: improve for fan entity to avoid FanEntityFeature warnings.

* feat: display device online status

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* refactor: fix imports

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
sususweet and pre-commit-ci[bot] authored Dec 18, 2024
1 parent 558131d commit a7aae81
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 106 deletions.
119 changes: 34 additions & 85 deletions custom_components/deye_dehumidifier/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,19 @@

from __future__ import annotations

from datetime import datetime

from homeassistant.config_entries import ConfigEntry
from homeassistant.const import Platform
from homeassistant.core import CALLBACK_TYPE, HomeAssistant, callback
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryAuthFailed, ConfigEntryNotReady
from homeassistant.helpers.aiohttp_client import async_get_clientsession
from homeassistant.helpers.entity import DeviceInfo, Entity
from homeassistant.helpers.event import async_call_later
from homeassistant.helpers.update_coordinator import CoordinatorEntity
from libdeye.cloud_api import (
DeyeCloudApi,
DeyeCloudApiCannotConnectError,
DeyeCloudApiInvalidAuthError,
)
from libdeye.const import QUERY_DEVICE_STATE_COMMAND
from libdeye.device_state_command import DeyeDeviceCommand, DeyeDeviceState
from libdeye.device_state_command import DeyeDeviceState
from libdeye.mqtt_client import DeyeMqttClient
from libdeye.types import DeyeApiResponseDeviceInfo

Expand All @@ -26,11 +23,13 @@
CONF_PASSWORD,
CONF_USERNAME,
DATA_CLOUD_API,
DATA_COORDINATOR,
DATA_DEVICE_LIST,
DATA_MQTT_CLIENT,
DOMAIN,
MANUFACTURER,
)
from .data_coordinator import DeyeDataUpdateCoordinator

PLATFORMS: list[Platform] = [
Platform.HUMIDIFIER,
Expand Down Expand Up @@ -72,6 +71,13 @@ def on_auth_token_refreshed(auth_token: str) -> None:
await cloud_api.get_device_list(),
)
)
for device in device_list:
coordinator = DeyeDataUpdateCoordinator(
hass, device, mqtt_client, cloud_api
)
device[DATA_COORDINATOR] = coordinator
await device[DATA_COORDINATOR].async_config_entry_first_refresh()

except DeyeCloudApiInvalidAuthError as err:
raise ConfigEntryAuthFailed from err
except DeyeCloudApiCannotConnectError as err:
Expand Down Expand Up @@ -99,7 +105,7 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
return unload_ok


class DeyeEntity(Entity):
class DeyeEntity(CoordinatorEntity, Entity):
"""Initiate Deye Base Class."""

def __init__(
Expand All @@ -109,11 +115,13 @@ def __init__(
cloud_api: DeyeCloudApi,
) -> None:
"""Initialize the instance."""
self.coordinator = device[DATA_COORDINATOR]
super().__init__(self.coordinator)
self._device = device
self._mqtt_client = mqtt_client
self._cloud_api = cloud_api
self._attr_has_entity_name = True
self._attr_available = self._device["online"]
self._device_available = self._device["online"]
self._attr_unique_id = self._device["mac"]
self.entity_id_base = f'deye_{self._device["mac"].lower()}' # We will override HA generated entity ID
self._attr_device_info = DeviceInfo(
Expand All @@ -123,92 +131,33 @@ def __init__(
name=self._device["device_name"],
)
self._attr_should_poll = False
self.subscription_muted: CALLBACK_TYPE | None = None
# payload from the server sometimes are not a valid string
if isinstance(self._device["payload"], str):
self.device_state = DeyeDeviceState(self._device["payload"])
else:
self.device_state = DeyeDeviceState(
"1411000000370000000000000000003C3C0000000000" # 20°C/60%RH as the default state
)
remove_handle = self.coordinator.async_add_listener(
self._handle_coordinator_update
)
self.async_on_remove(remove_handle)

def update_device_availability(self, available: bool) -> None:
"""Will be called when received new availability status."""
if self.subscription_muted:
return
self._attr_available = available
self.async_write_ha_state()

def update_device_state(self, state: DeyeDeviceState) -> None:
"""Will be called when received new DeyeDeviceState."""
if self.subscription_muted:
return
self.device_state = state
async def publish_command_async(self, attribute, value):
"""Push command to a queue and deal with them together."""
self.async_write_ha_state()
self.hass.bus.fire(
"call_humidifier_method", {"prop": attribute, "value": value}
)
await self.coordinator.async_request_refresh()

async def async_added_to_hass(self) -> None:
"""When entity is added to Home Assistant."""
if self._device["platform"] == 1:
self.async_on_remove(
self._mqtt_client.subscribe_availability_change(
self._device["product_id"],
self._device["device_id"],
self.update_device_availability,
)
)
self.async_on_remove(
self._mqtt_client.subscribe_state_change(
self._device["product_id"],
self._device["device_id"],
self.update_device_state,
)
)

await self.poll_device_state()
self.async_on_remove(self.cancel_polling)
@property
def available(self):
return self._device_available

@callback
async def poll_device_state(self, now: datetime | None = None) -> None:
"""
Some Deye devices have a very long heartbeat period. So polling is still necessary to get the latest state as
quickly as possible.
"""
if self._device["platform"] == 1:
self._mqtt_client.publish_command(
self._device["product_id"],
self._device["device_id"],
QUERY_DEVICE_STATE_COMMAND,
)
elif self._device["platform"] == 2:
state = DeyeDeviceState(
await self._cloud_api.get_fog_platform_device_properties(
self._device["device_id"]
)
)
self.update_device_state(state)
self.cancel_polling = async_call_later(self.hass, 10, self.poll_device_state)

def mute_subscription_for_a_while(self) -> None:
"""Mute subscription for a while to avoid state bouncing."""
if self.subscription_muted:
self.subscription_muted()

@callback
def unmute(now: datetime) -> None:
self.subscription_muted = None

self.subscription_muted = async_call_later(self.hass, 10, unmute)

async def publish_command(self, command: DeyeDeviceCommand) -> None:
if self._device["platform"] == 1:
"""Publish a MQTT command to this device."""
self._mqtt_client.publish_command(
self._device["product_id"], self._device["device_id"], command.bytes()
)
elif self._device["platform"] == 2:
"""Publish a MQTT command to this device."""
await self._cloud_api.set_fog_platform_device_properties(
self._device["device_id"], command.json()
)
self.async_write_ha_state()
self.mute_subscription_for_a_while()
def _handle_coordinator_update(self) -> None:
"""Handle updated data from the coordinator."""
self.device_state = self.coordinator.data
self._device_available = self.coordinator.device_available
super()._handle_coordinator_update()
1 change: 1 addition & 0 deletions custom_components/deye_dehumidifier/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@
DATA_CLOUD_API = "cloud_api"
DATA_MQTT_CLIENT = "mqtt_client"
DATA_DEVICE_LIST = "device_list"
DATA_COORDINATOR = "coordinator"
MANUFACTURER = "Ningbo Deye Technology Co., Ltd"
106 changes: 106 additions & 0 deletions custom_components/deye_dehumidifier/data_coordinator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import asyncio
import logging
from datetime import datetime, timedelta

from homeassistant.core import CALLBACK_TYPE, callback
from homeassistant.helpers.event import async_call_later
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator
from libdeye.const import QUERY_DEVICE_STATE_COMMAND
from libdeye.device_state_command import DeyeDeviceState

_LOGGER = logging.getLogger(__name__)


class DeyeDataUpdateCoordinator(DataUpdateCoordinator):
def __init__(self, hass, device, mqtt_client, cloud_api):
super().__init__(
hass,
_LOGGER,
name="deye_data_update_coordinator",
update_method=self.poll_device_state,
update_interval=timedelta(seconds=10),
)
self._mqtt_client = mqtt_client
self._cloud_api = cloud_api
self.subscription_muted: CALLBACK_TYPE | None = None

self.data = DeyeDeviceState(
"1411000000370000000000000000003C3C0000000000" # 20°C/60%RH as the default state
)
self._device = device
self.device_available = self._device["online"]
"""When entity is added to Home Assistant."""
if self._device["platform"] == 1:
# self._mqtt_client.subscribe_availability_change(
# self._device["product_id"],
# self._device["device_id"],
# self.update_device_availability,
# )
self._mqtt_client.subscribe_state_change(
self._device["product_id"],
self._device["device_id"],
self.update_device_state,
)

self.receive_queue = asyncio.Queue()
self.device_available_queue = asyncio.Queue()

def mute_subscription_for_a_while(self) -> None:
"""Mute subscription for a while to avoid state bouncing."""
if self.subscription_muted:
self.subscription_muted()

@callback
def unmute(now: datetime) -> None:
self.subscription_muted = None

self.subscription_muted = async_call_later(self.hass, 20, unmute)

def update_device_state(self, state: DeyeDeviceState) -> None:
"""Will be called when received new DeyeDeviceState."""
self.receive_queue.put_nowait(state)
# self.async_set_updated_data(state)

async def async_request_refresh(self) -> None:
self.mute_subscription_for_a_while()
await super().async_request_refresh()

async def poll_device_state(self) -> DeyeDeviceState:
"""
Some Deye devices have a very long heartbeat period. So polling is still necessary to get the latest state as
quickly as possible.
"""
# _LOGGER.error("poll_device_state called: " + str(self._device["product_id"]))
if self.subscription_muted:
return self.data

device_list = list(
filter(
lambda d: d["product_type"] == "dehumidifier"
and d["device_id"] == self._device["device_id"],
await self._cloud_api.get_device_list(),
)
)
if len(device_list) > 0:
device = device_list[0]
self.device_available = device["online"]

if self._device["platform"] == 1:
self._mqtt_client.publish_command(
self._device["product_id"],
self._device["device_id"],
QUERY_DEVICE_STATE_COMMAND,
)
response = await asyncio.wait_for(
self.receive_queue.get(), timeout=10
) # 设置超时时间
# _LOGGER.error(response.to_command().json())
return response
elif self._device["platform"] == 2:
response = DeyeDeviceState(
await self._cloud_api.get_fog_platform_device_properties(
self._device["device_id"]
)
)
# _LOGGER.error(response.to_command().json())
return response
23 changes: 15 additions & 8 deletions custom_components/deye_dehumidifier/fan.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ def __init__(
self.entity_id = f"fan.{self.entity_id_base}_fan"
feature_config = get_product_feature_config(device["product_id"])
self._attr_supported_features = FanEntityFeature.SET_SPEED
if hasattr(FanEntityFeature, "TURN_ON"): # v2024.8
self._attr_supported_features |= FanEntityFeature.TURN_ON
if hasattr(FanEntityFeature, "TURN_OFF"):
self._attr_supported_features |= FanEntityFeature.TURN_OFF
if feature_config["oscillating"]:
self._attr_supported_features |= FanEntityFeature.OSCILLATE
self._named_fan_speeds = feature_config["fan_speed"]
Expand Down Expand Up @@ -82,16 +86,17 @@ def percentage(self) -> int:
async def async_oscillate(self, oscillating: bool) -> None:
"""Oscillate the fan."""
self.device_state.oscillating_switch = oscillating
await self.publish_command(self.device_state.to_command())
await self.publish_command_async("oscillating_switch", oscillating)

async def async_set_percentage(self, percentage: int) -> None:
"""Set the speed of the fan, as a percentage."""
if percentage == 0:
await self.async_turn_off()
self.device_state.fan_speed = percentage_to_ordered_list_item(
self._named_fan_speeds, percentage
fan_speed = int(
percentage_to_ordered_list_item(self._named_fan_speeds, percentage)
)
await self.publish_command(self.device_state.to_command())
self.device_state.fan_speed = fan_speed
await self.publish_command_async("fan_speed", fan_speed)

async def async_turn_on(
self,
Expand All @@ -101,13 +106,15 @@ async def async_turn_on(
) -> None:
"""Turn on the fan."""
self.device_state.power_switch = True
await self.publish_command_async("power_switch", True)
if percentage is not None:
self.device_state.fan_speed = percentage_to_ordered_list_item(
self._named_fan_speeds, percentage
fan_speed = int(
percentage_to_ordered_list_item(self._named_fan_speeds, percentage)
)
await self.publish_command(self.device_state.to_command())
self.device_state.fan_speed = fan_speed
await self.publish_command_async("fan_speed", fan_speed)

async def async_turn_off(self, **kwargs: Any) -> None:
"""Turn the entity off."""
self.device_state.power_switch = False
await self.publish_command(self.device_state.to_command())
await self.publish_command_async("power_switch", False)
Loading

0 comments on commit a7aae81

Please sign in to comment.