From 23e56612d44a76096dfb8f7ec3042a4c67dd1a07 Mon Sep 17 00:00:00 2001 From: jkhalil Date: Thu, 24 Aug 2023 16:24:27 -0700 Subject: [PATCH] Update Websocket connection to handle retries and multiple data streams. Update to add PH Binary Sensors --- .../eheim_digital/binary_sensor.py | 211 ++++++++++++++++++ custom_components/eheim_digital/const.py | 2 +- .../eheim_digital/coordinator.py | 13 +- custom_components/eheim_digital/sensor.py | 39 +++- custom_components/eheim_digital/websocket.py | 178 ++++++++++----- 5 files changed, 377 insertions(+), 66 deletions(-) create mode 100644 custom_components/eheim_digital/binary_sensor.py diff --git a/custom_components/eheim_digital/binary_sensor.py b/custom_components/eheim_digital/binary_sensor.py new file mode 100644 index 0000000..7f82322 --- /dev/null +++ b/custom_components/eheim_digital/binary_sensor.py @@ -0,0 +1,211 @@ +"""Platform for BinarySensor integration""" +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass +from typing import Any + +from homeassistant.components.binary_sensor import ( + BinarySensorDeviceClass, + BinarySensorEntity, + BinarySensorEntityDescription, +) + +from homeassistant.config_entries import ConfigEntry +from homeassistant.core import HomeAssistant, callback +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.typing import StateType +from homeassistant.helpers.update_coordinator import CoordinatorEntity +from homeassistant.helpers.device_registry import format_mac + +from . import EheimDigitalDataUpdateCoordinator +from .devices import EheimDevice +from .const import LOGGER, DOMAIN + + +@dataclass +class EheimBinarySensorDescriptionMixin: + """Mixin for Eheim binary sensor.""" + + value_fn: Callable[[dict[str, Any]], StateType] + + +@dataclass +class EheimBinarySensorDescription( + BinarySensorEntityDescription, EheimBinarySensorDescriptionMixin +): + """Class describing Eheim binary sensor entities.""" + + attr_fn: Callable[[dict[str, Any]], dict[str, StateType]] = lambda _: {} + + +BINARY_SENSOR_DESCRIPTIONS: tuple[EheimBinarySensorDescription, ...] = ( + # Heater Binary Sensors + EheimBinarySensorDescription( + key="heater_is_heating", + device_class=BinarySensorDeviceClass.RUNNING, + name="Heater Is Heating", + entity_registry_enabled_default=True, + value_fn=lambda data: data.get("isHeating"), + ), + EheimBinarySensorDescription( + key="heater_alert", + device_class=BinarySensorDeviceClass.PROBLEM, + name="Heater Alert", + entity_registry_enabled_default=True, + value_fn=lambda data: data.get("alert_State"), + ), + EheimBinarySensorDescription( + key="heater_is_active", + device_class=BinarySensorDeviceClass.RUNNING, + name="Heater State", + entity_registry_enabled_default=True, + value_fn=lambda data: data.get("active"), + ), + # PH Control Binary Sensors + EheimBinarySensorDescription( + key="ph_control_acclimatization", + device_class=BinarySensorDeviceClass.RUNNING, + name="PH Acclimatization", + entity_registry_enabled_default=True, + value_fn=lambda data: data.get("acclimatization"), + ), + EheimBinarySensorDescription( + key="ph_control_is_active", + device_class=BinarySensorDeviceClass.RUNNING, + name="PH Is Active", + entity_registry_enabled_default=True, + value_fn=lambda data: data.get("active"), + ), + EheimBinarySensorDescription( + key="ph_control_alert", + device_class=BinarySensorDeviceClass.PROBLEM, + name="PH Alert", + entity_registry_enabled_default=True, + value_fn=lambda data: data.get("alertState"), + ), + EheimBinarySensorDescription( + key="ph_control_is_valve_active", + device_class=BinarySensorDeviceClass.RUNNING, + name="PH Valve Is Active", + entity_registry_enabled_default=True, + value_fn=lambda data: data.get("valveIsActive"), + ), + # Filter Binary Sensors + EheimBinarySensorDescription( + key="filter_is_active", + device_class=BinarySensorDeviceClass.RUNNING, + name="Filter State", + entity_registry_enabled_default=True, + value_fn=lambda data: data.get("FilterActive"), + ), +) + + +BINARY_SENSOR_GROUPS = { + # Define binary sensor groups similar to SENSOR_GROUPS + "heater": ["heater_is_heating", "heater_alert", "heater_is_active"], + "ph_control": [ + "ph_control_acclimatization", + "ph_control_is_active", + "ph_control_alert", + "ph_control_is_valve_active", + "ph_control_firmware_update", + ], + "filter": ["filter_is_active"], +} + + +async def async_setup_entry( + hass: HomeAssistant, entry: ConfigEntry, async_add_entities: AddEntitiesCallback +) -> None: + """Add EheimDevice entities from a config_entry.""" + LOGGER.debug("Setting up Eheim Digital BinarySensor platform") + + coordinator: EheimDigitalDataUpdateCoordinator = hass.data[DOMAIN][entry.entry_id] + + binary_sensors = [] + for device in coordinator.devices: + device_group = device.device_group + binary_sensor_keys_for_group = BINARY_SENSOR_GROUPS.get(device_group, []) + + device_data = _get_binary_sensor_data(coordinator.data, device.mac) + + for description in BINARY_SENSOR_DESCRIPTIONS: + if description.key in binary_sensor_keys_for_group: + binary_sensors.append( + EheimBinarySensor(coordinator, description, device, device_data) + ) + + async_add_entities(binary_sensors, True) + + +class EheimBinarySensor( + CoordinatorEntity[EheimDigitalDataUpdateCoordinator], BinarySensorEntity +): + "Define an Eheim BinarySensor Entity" + + _attr_has_entity_name = True + entity_description: EheimBinarySensorDescription + + def __init__( + self, + coordinator: EheimDigitalDataUpdateCoordinator, + description: EheimBinarySensorDescription, + device: EheimDevice, + device_data: dict[str, Any], + ) -> None: + """Initialize the BinarySensor.""" + super().__init__(coordinator) + self.entity_description = description + self._sensor_data = coordinator.data[device.mac] + self._device = device + LOGGER.debug( + "Initializing Eheim BinarySensor for Device: %s Entity: %s", + self._device.mac, + self.entity_description.key, + ) + + @property + def is_on(self) -> bool: + """Return True if the binary sensor is on.""" + return bool(self.entity_description.value_fn(self._sensor_data)) + + @property + def unique_id(self) -> str: + """Return the unique ID for this binary sensor.""" + return f"{self._device.model.lower().replace(' ', '_')}_{format_mac(self._device.mac).replace(':','_')}_{self.entity_description.key}" + + @callback + def _handle_coordinator_update(self) -> None: + "Handle updated data from the coordinator." "" + self._sensor_data = self.coordinator.data[self._device.mac] + self.async_write_ha_state() + + @property + def device_info(self): + return { + "identifiers": {(DOMAIN, self._device.mac)}, + "name": self._device.name, + "manufacturer": "Eheim", + "model": self._device.model, + } + + +def _get_binary_sensor_data(sensors: dict[str, Any], mac_address: str) -> Any: + """Get the binary sensor data for a sensor type.""" + if sensors is None: + LOGGER.warning( + "Binary sensor data is None when trying to fetch %s", mac_address + ) + return None + + # Form the key using device_type and mac_address + key = mac_address + + data = sensors.get(key) + if data is None: + LOGGER.warning("No data found for key: %s", key) + else: + LOGGER.debug("Received binary sensor data for %s: %s", key, data) + return data diff --git a/custom_components/eheim_digital/const.py b/custom_components/eheim_digital/const.py index ef11195..286b285 100755 --- a/custom_components/eheim_digital/const.py +++ b/custom_components/eheim_digital/const.py @@ -7,7 +7,7 @@ DOMAIN = "eheim_digital" VERSION = "0.0.1" UPDATE_INTERVAL = 30 -PLATFORMS = ["sensor"] # ["sensor", "binary_sensor", "light"] +PLATFORMS = ["sensor", "binary_sensor"] # ["sensor", "binary_sensor", "light"] DEVICE_TYPES = { "filter": {"name": "Filter", "icon": "mdi:filter"}, diff --git a/custom_components/eheim_digital/coordinator.py b/custom_components/eheim_digital/coordinator.py index 4490a40..7e1138a 100644 --- a/custom_components/eheim_digital/coordinator.py +++ b/custom_components/eheim_digital/coordinator.py @@ -29,11 +29,13 @@ def __init__( super().__init__(hass, LOGGER, name=DOMAIN, update_interval=update_interval) # hass.async_create_task(self._async_update_data()) - LOGGER.debug("COORDINATOR: Initialized DataUpdateCoordinator With Tasks") async def _async_update_data(self) -> dict[str, Any]: """Update data via Websocket.""" all_device_data = {} + LOGGER.debug("COORDINATOR: Starting data update") + num_devices = len(self.devices) + LOGGER.debug("COORDINATOR: Number of devices: %s", num_devices) try: LOGGER.debug("COORDINATOR: Calling WebSocket to update data in Coordinator") for device in self.devices: @@ -42,8 +44,15 @@ async def _async_update_data(self) -> dict[str, Any]: all_device_data[device.mac] = device_data # device_data = await self.websocket_client.get_device_data(device) # all_device_data[device.device_type, device.mac] = device_data - LOGGER.debug("COORDINATOR: Data in Coordinator: %s", device_data) + LOGGER.debug( + "COORDINATOR: Device %s data in Coordinator: %s", + device, + device_data, + ) except Exception as error: raise UpdateFailed(error) from error + LOGGER.debug( + "COORDINATOR: Final aggregated data in Coordinator: %s", all_device_data + ) return all_device_data diff --git a/custom_components/eheim_digital/sensor.py b/custom_components/eheim_digital/sensor.py index e1eb0a1..4fb0e20 100644 --- a/custom_components/eheim_digital/sensor.py +++ b/custom_components/eheim_digital/sensor.py @@ -68,8 +68,9 @@ class EheimSensorDescription(SensorEntityDescription, EheimSensorDescriptionMixi icon="mdi:timer", name="Operating Time", entity_registry_enabled_default=True, - native_unit_of_measurement="h", - value_fn=lambda data: int(data.get("actualTime") / 60), + native_unit_of_measurement="d", + state_class="total_increasing", + value_fn=lambda data: data.get("actualTime") / (1440 * 24), ), EheimSensorDescription( key="night_mode_end_time", @@ -126,7 +127,9 @@ class EheimSensorDescription(SensorEntityDescription, EheimSensorDescriptionMixi name="Brightness", entity_registry_enabled_default=True, native_unit_of_measurement="%", - value_fn=lambda data: round(sum(data["currentValues"]) / len(data["currentValues"])), + value_fn=lambda data: round( + sum(data["currentValues"]) / len(data["currentValues"]) + ), ), EheimSensorDescription( key="ccv_brightness_white", @@ -155,11 +158,32 @@ class EheimSensorDescription(SensorEntityDescription, EheimSensorDescriptionMixi # PH Control Sensors EheimSensorDescription( key="ph_current_ph", - icon="mdi:ph", + device_class=SensorDeviceClass.PH, name="Current PH", entity_registry_enabled_default=True, value_fn=lambda data: round((int(data["isPH"]) / 10), 1), ), + EheimSensorDescription( + key="ph_target_ph", + device_class=SensorDeviceClass.PH, + name="Target PH", + entity_registry_enabled_default=True, + value_fn=lambda data: round((int(data["sollPH"]) / 10), 1), + ), + EheimSensorDescription( + key="ph_dayStart_time", + device_class=SensorDeviceClass.TIMESTAMP, + name="Day Start Time", + entity_registry_enabled_default=True, + value_fn=lambda data: data.get("dayStartT"), + ), + EheimSensorDescription( + key="ph_nightStart_time", + device_class=SensorDeviceClass.TIMESTAMP, + name="Night Start Time", + entity_registry_enabled_default=True, + value_fn=lambda data: data.get("nightStartT"), + ), ) SENSOR_GROUPS = { @@ -179,7 +203,12 @@ class EheimSensorDescription(SensorEntityDescription, EheimSensorDescriptionMixi "filter_turn_off_time", "filter_turn_off_time", ], - "ph_control": ["ph_current_ph"], + "ph_control": [ + "ph_current_ph", + "ph_target_ph", + "ph_dayStart_time", + "ph_nightStart_time", + ], "other": [], } diff --git a/custom_components/eheim_digital/websocket.py b/custom_components/eheim_digital/websocket.py index 8127dc1..3257853 100755 --- a/custom_components/eheim_digital/websocket.py +++ b/custom_components/eheim_digital/websocket.py @@ -27,36 +27,86 @@ def __init__(self, host: str) -> None: self._devices = None self._client_list = None self._lock = asyncio.Lock() + self.buffer = [] + self.send_interval = 1 # 1 second + self.max_retries = 3 # Maximum number of reconnection attempts + self.heartbeat_interval = 30 # 30 seconds + + @property + def is_connected(self): + return self._websocket is not None and not self._websocket.closed + + async def reconnect(self): + """Attempt to reconnect to the server.""" + retry_count = 0 + while not self.is_connected and retry_count < self.max_retries: + try: + await self.connect_websocket() + LOGGER.info(f"Reconnected on attempt {retry_count + 1}") + return + except Exception as e: + LOGGER.error(f"Reconnection attempt {retry_count + 1} failed: {e}") + retry_count += 1 + await asyncio.sleep(5) + LOGGER.error("Failed to reconnect after max retries") + + async def start_heartbeat(self): + """Send a heartbeat message at regular intervals.""" + while self.is_connected: + await self._send_message({"title": "GET_MESH_NETWORK", "to": "MASTER"}) + await asyncio.sleep(self.heartbeat_interval) + + async def buffered_send(self, message: Dict): + """Add the message to the buffer.""" + self.buffer.append(message) + + async def process_buffer(self): + """Process and send messages from the buffer at regular intervals.""" + while self.is_connected: + if self.buffer: + message = self.buffer.pop(0) + await self._send_message(message) + await asyncio.sleep(self.send_interval) + + async def check_connection(self): + """Check the WebSocket connection and reconnect if necessary.""" + if not self.is_connected: + await self.reconnect() async def connect_websocket(self) -> None: """Connect to the WebSocket server and process initial messages.""" - LOGGER.debug("WEBSOCKET: Called function connect_websocket") - try: - self._websocket = await websockets.connect(self._url) # pylint: disable=all - - # Process the first two initial messages - for _ in range(2): - initial_response = await self._websocket.recv() - messages = json.loads(initial_response) - LOGGER.debug("WEBSOCKET: Initial WebSocket Response: %s", messages) - - # Extracting and storing the client list - for message in messages: - if "clientList" in message: - self._client_list = list(set(message["clientList"])) - break - - LOGGER.debug("WEBSOCKET: Client List: %s", self._client_list) - except Exception as ex: - raise EheimDigitalWebSocketClientCommunicationError( - f"Failed to connect to WebSocket: {ex}" - ) from ex + async with self._lock: # Ensure only one connection attempt at a time + LOGGER.debug("WEBSOCKET: Called function connect_websocket") + try: + self._websocket = await websockets.connect( + self._url, subprotocols=["arduino"] + ) # pylint: disable=all + + # Process the first two initial messages + for _ in range(2): + initial_response = await self._websocket.recv() + messages = json.loads(initial_response) + LOGGER.debug("WEBSOCKET: Initial WebSocket Response: %s", messages) + + # Extracting and storing the client list + for message in messages: + if "clientList" in message: + self._client_list = list(set(message["clientList"])) + break + + LOGGER.debug("WEBSOCKET: Client List: %s", self._client_list) + except Exception as ex: + raise EheimDigitalWebSocketClientCommunicationError( + f"Failed to connect to WebSocket: {ex}" + ) from ex async def disconnect_websocket(self) -> None: """Disconnect from the WebSocket server.""" - if self._websocket: - await self._websocket.close() - self._websocket = None + async with self._lock: # Ensure only one disconnection attempt at a time + LOGGER.debug("WEBSOCKET: Called function disconnect_websocket") + if self._websocket: + await self._websocket.close() + self._websocket = None async def fetch_devices(self) -> list[EheimDevice]: """Fetch devices information and data from the WebSocket.""" @@ -69,37 +119,38 @@ async def fetch_devices(self) -> list[EheimDevice]: # Initialize devices as an empty list devices = [] - # Iterate through the unique clients and send requests for device information - for client in self._client_list: - request_message = ( - f'{{"title": "GET_USRDTA","to": "{client}","from": "USER"}}' - ) - LOGGER.debug( - "WEBSOCKET: Sending Device Client: %s Request Message: %s, ", - client, - request_message, - ) - await self._websocket.send(request_message) - response = await self._websocket.recv() - - # Log the raw response before processing - # LOGGER.debug("WEBSOCKET: Raw Response for Device Client: %s : %s", client, response) - - messages = json.loads(response) - LOGGER.debug( - "WEBSOCKET: Receiving Device Client: %s Response: %s", client, messages - ) + # Iterate through the unique clients, send requests for device information, and process the responses + async with self._lock: + for client in self._client_list: + request_message = ( + f'{{"title": "GET_USRDTA","to": "{client}","from": "USER"}}' + ) + LOGGER.debug( + "WEBSOCKET: Sending Device Client: %s Request Message: %s, ", + client, + request_message, + ) + await self._websocket.send(request_message) + response = await self._websocket.recv() - # Process the response and extract the device information - if isinstance(messages, list): - for message in messages: - if message["title"] == "USRDTA": - device = EheimDevice(message) - devices.append(device) - elif isinstance(messages, dict) and messages["title"] == "USRDTA": - device = EheimDevice(messages) - devices.append(device) - LOGGER.debug("WEBSOCKET: Devices: %s", devices) + messages = json.loads(response) + LOGGER.debug( + "WEBSOCKET: Receiving Device Client: %s Response: %s", + client, + messages, + ) + + # Process the response and extract the device information + if isinstance(messages, list): + for message in messages: + if message["title"] == "USRDTA": + device = EheimDevice(message) + devices.append(device) + elif isinstance(messages, dict) and messages["title"] == "USRDTA": + device = EheimDevice(messages) + devices.append(device) + + LOGGER.debug("WEBSOCKET: Devices: %s", devices) # Log the extracted details for device in devices: @@ -119,7 +170,7 @@ async def fetch_devices(self) -> list[EheimDevice]: # Send Request/Command to Device async def _send_message(self, message): """Send a specific message to the device and wait for its response.""" - + await self.check_connection() async with self._lock: # Ensure only one request is active at a time if self._websocket is None: await self.connect_websocket() @@ -134,7 +185,8 @@ async def _send_message(self, message): if response_dict.get("title") in ["REQ_KEEP_ALIVE", "KEEP_ALIVE"]: LOGGER.debug( - "WEBSOCKET: Received keep-alive. Continuing to wait for actual response." + "WEBSOCKET: Received keep-alive. Continuing to wait for response to: %s", + message_str, ) continue # Ignore keep-alives and continue waiting for the actual response @@ -284,8 +336,9 @@ async def get_ph_data(self, mac_address: str): "ph_control": [get_ph_data], } - async def get_device_data(self, device: EheimDevice) -> Dict: + async def get_device_data(self, device: EheimDevice) -> dict: """Get data for all devices.""" + await self.check_connection() device_type = device.device_type device_group = device.device_group functions = self.DEVICE_DATA_FUNCTIONS.get(device_group, []) @@ -298,8 +351,17 @@ async def get_device_data(self, device: EheimDevice) -> Dict: device_data = {} for function in functions: + LOGGER.debug( + "WEBSOCKET: Starting function %s for device %s", + function.__name__, + device.mac, + ) response = await function(self, device.mac) response_dict = json.loads(response) device_data.update(response_dict) - # LOGGER.debug("WEBSOCKET: All Device Data: %s", device_data) + LOGGER.debug( + "WEBSOCKET: Completed function %s for device %s", + function.__name__, + device.mac, + ) return device_data