diff --git a/src/uiprotect/api.py b/src/uiprotect/api.py index 8aa77fb3..cd39aaae 100644 --- a/src/uiprotect/api.py +++ b/src/uiprotect/api.py @@ -1146,7 +1146,7 @@ async def get_bootstrap(self) -> Bootstrap: async def get_devices_raw(self, model_type: ModelType) -> list[dict[str, Any]]: """Gets a raw device list given a model_type""" - return await self.api_request_list(f"{model_type.value}s") + return await self.api_request_list(model_type.devices_key) async def get_devices( self, @@ -1703,7 +1703,7 @@ async def unadopt_device(self, model_type: ModelType, device_id: str) -> None: async def adopt_device(self, model_type: ModelType, device_id: str) -> None: """Adopts a device""" - key = f"{model_type.value}s" + key = model_type.devices_key data = await self.api_request_obj( "devices/adopt", method="post", diff --git a/src/uiprotect/data/bootstrap.py b/src/uiprotect/data/bootstrap.py index 898d4f0f..f10d9f2e 100644 --- a/src/uiprotect/data/bootstrap.py +++ b/src/uiprotect/data/bootstrap.py @@ -197,8 +197,8 @@ def unifi_dict_to_dict(cls, data: dict[str, Any]) -> dict[str, Any]: ) data["macLookup"] = {} data["idLookup"] = {} - for model_type in ModelType.bootstrap_models(): - key = f"{model_type}s" + for model_type in ModelType.bootstrap_models_types_set(): + key = model_type.devices_key items: dict[str, ProtectModel] = {} for item in data[key]: if ( @@ -234,8 +234,8 @@ def unifi_dict( if "idLookup" in data: del data["idLookup"] - for model_type in ModelType.bootstrap_models(): - attr = f"{model_type}s" + for model_type in ModelType.bootstrap_models_types_set(): + attr = model_type.devices_key if attr in data and isinstance(data[attr], dict): data[attr] = list(data[attr].values()) @@ -302,7 +302,7 @@ def get_device_from_mac(self, mac: str) -> ProtectAdoptableDeviceModel | None: if ref is None: return None - devices: dict[str, ProtectModelWithId] = getattr(self, f"{ref.model.value}s") + devices: dict[str, ProtectModelWithId] = getattr(self, ref.model.devices_key) return cast(ProtectAdoptableDeviceModel, devices.get(ref.id)) def get_device_from_id(self, device_id: str) -> ProtectAdoptableDeviceModel | None: @@ -310,7 +310,7 @@ def get_device_from_id(self, device_id: str) -> ProtectAdoptableDeviceModel | No ref = self.id_lookup.get(device_id) if ref is None: return None - devices: dict[str, ProtectModelWithId] = getattr(self, f"{ref.model.value}s") + devices: dict[str, ProtectModelWithId] = getattr(self, ref.model.devices_key) return cast(ProtectAdoptableDeviceModel, devices.get(ref.id)) def process_event(self, event: Event) -> None: @@ -344,21 +344,24 @@ def _create_stat( def _process_add_packet( self, + model_type: ModelType, packet: WSPacket, data: dict[str, Any], ) -> WSSubscriptionMessage | None: - obj = create_from_unifi_dict(data, api=self._api) - - if isinstance(obj, Event): + obj = create_from_unifi_dict(data, api=self._api, model_type=model_type) + if model_type is ModelType.EVENT: + if TYPE_CHECKING: + assert isinstance(obj, Event) self.process_event(obj) - elif isinstance(obj, NVR): + if model_type is ModelType.NVR: + if TYPE_CHECKING: + assert isinstance(obj, NVR) self.nvr = obj - elif ( - isinstance(obj, ProtectAdoptableDeviceModel) - and obj.model is not None - and obj.model.value in ModelType.bootstrap_models_set() - ): - key = f"{obj.model.value}s" + elif model_type in ModelType.bootstrap_models_types_set(): + if TYPE_CHECKING: + assert isinstance(obj, ProtectAdoptableDeviceModel) + assert isinstance(obj.model, ModelType) + key = obj.model.devices_key if not self._api.ignore_unadopted or ( obj.is_adopted and not obj.is_adopted_by_other ): @@ -381,9 +384,12 @@ def _process_add_packet( new_obj=obj, ) - def _process_remove_packet(self, packet: WSPacket) -> WSSubscriptionMessage | None: - model: str | None = packet.action_frame.data.get("modelKey") - devices: dict[str, ProtectDeviceModel] | None = getattr(self, f"{model}s", None) + def _process_remove_packet( + self, model_type: ModelType, packet: WSPacket + ) -> WSSubscriptionMessage | None: + devices: dict[str, ProtectDeviceModel] | None = getattr( + self, model_type.devices_key, None + ) if devices is None: return None @@ -443,12 +449,12 @@ def _process_nvr_update( def _process_device_update( self, + model_type: ModelType, packet: WSPacket, action: dict[str, Any], data: dict[str, Any], ignore_stats: bool, ) -> WSSubscriptionMessage | None: - model_type = action["modelKey"] remove_keys = ( STATS_AND_IGNORE_DEVICE_KEYS if ignore_stats else IGNORE_DEVICE_KEYS ) @@ -456,33 +462,31 @@ def _process_device_update( del data[key] # `last_motion` from cameras update every 100 milliseconds when a motion event is active # this overrides the behavior to only update `last_motion` when a new event starts - if model_type == "camera" and "lastMotion" in data: + if model_type is ModelType.CAMERA and "lastMotion" in data: del data["lastMotion"] # nothing left to process if not data: self._create_stat(packet, None, True) return None - key = f"{model_type}s" - devices: dict[str, ProtectModelWithId] = getattr(self, key) + devices: dict[str, ProtectModelWithId] = getattr(self, model_type.devices_key) action_id: str = action["id"] if action_id not in devices: # ignore updates to events that phase out - if model_type != _ModelType_Event_value: + if model_type is not ModelType.EVENT: _LOGGER.debug("Unexpected %s: %s", key, action_id) return None obj = devices[action_id] - model = obj.model data = obj.unifi_dict_to_dict(data) old_obj = obj.copy() obj = obj.update_from_dict(deepcopy(data)) - if model is ModelType.EVENT: + if model_type is ModelType.EVENT: if TYPE_CHECKING: assert isinstance(obj, Event) self.process_event(obj) - elif model is ModelType.CAMERA: + elif model_type is ModelType.CAMERA: if TYPE_CHECKING: assert isinstance(obj, Camera) if "last_ring" in data and obj.last_ring: @@ -490,7 +494,7 @@ def _process_device_update( _LOGGER.debug("last_ring for %s (%s)", obj.id, is_recent) if is_recent: obj.set_ring_timeout() - elif model is ModelType.SENSOR: + elif model_type is ModelType.SENSOR: if TYPE_CHECKING: assert isinstance(obj, Sensor) if "alarm_triggered_at" in data and obj.alarm_triggered_at: @@ -532,13 +536,14 @@ def process_ws_packet( self._create_stat(packet, None, True) return None - if models and ModelType(model_key) not in models: + model_type = ModelType.from_string(model_key) + if models and model_type not in models: self._create_stat(packet, None, True) return None action_action: str = action["action"] if action_action == "remove": - return self._process_remove_packet(packet) + return self._process_remove_packet(model_type, packet) if not data: self._create_stat(packet, None, True) @@ -546,16 +551,13 @@ def process_ws_packet( try: if action_action == "add": - return self._process_add_packet(packet, data) + return self._process_add_packet(model_type, packet, data) if action_action == "update": - if model_key == _ModelType_NVR_value: + if model_type is ModelType.NVR: return self._process_nvr_update(packet, data, ignore_stats) - - if ( - model_key in ModelType.bootstrap_models_set() - or model_key == _ModelType_Event_value - ): + if model_type in ModelType.bootstrap_models_types_and_event_set(): return self._process_device_update( + model_type, packet, action, data, @@ -576,7 +578,7 @@ def _handle_ws_error(self, action: dict[str, Any], err: Exception) -> None: msg = f"Validation error processing event: {action['id']}. Ignoring event." else: try: - model_type = ModelType(action["modelKey"]) + model_type = ModelType.from_string(action["modelKey"]) device_id: str = action["id"] task = asyncio.create_task(self.refresh_device(model_type, device_id)) self._refresh_tasks.add(task) @@ -609,7 +611,7 @@ async def refresh_device(self, model_type: ModelType, device_id: str) -> None: self.nvr = device else: devices: dict[str, ProtectModelWithId] = getattr( - self, f"{model_type.value}s" + self, model_type.devices_key ) devices[device.id] = device _LOGGER.debug("Successfully refresh model: %s %s", model_type, device_id) diff --git a/src/uiprotect/data/convert.py b/src/uiprotect/data/convert.py index 4e92b27d..9995d778 100644 --- a/src/uiprotect/data/convert.py +++ b/src/uiprotect/data/convert.py @@ -63,6 +63,7 @@ def create_from_unifi_dict( data: dict[str, Any], api: ProtectApiClient | None = None, klass: type[ProtectModel] | None = None, + model_type: ModelType | None = None, ) -> ProtectModel: """ Helper method to read the `modelKey` from a UFP JSON dict and convert to currect Python class. @@ -71,6 +72,9 @@ def create_from_unifi_dict( if "modelKey" not in data: raise DataDecodeError("No modelKey") + if model_type is not None and klass is None: + klass = MODEL_TO_CLASS.get(model_type) + if klass is None: klass = get_klass_from_dict(data) diff --git a/src/uiprotect/data/types.py b/src/uiprotect/data/types.py index 2f490192..5ac1ad0a 100644 --- a/src/uiprotect/data/types.py +++ b/src/uiprotect/data/types.py @@ -2,7 +2,7 @@ import enum from collections.abc import Callable, Coroutine -from functools import cache +from functools import cache, cached_property from typing import Any, Literal, Optional, TypeVar, Union from packaging.version import Version as BaseVersion @@ -109,31 +109,62 @@ class ModelType(str, UnknownValuesEnumMixin, enum.Enum): RECORDING_SCHEDULE = "recordingSchedule" UNKNOWN = "unknown" + @cached_property + def devices_key(self) -> str: + """Return the devices key.""" + return f"{self.value}s" + + @classmethod + @cache + def from_string(cls, value: str) -> ModelType: + return cls(value) + @staticmethod @cache - def bootstrap_models() -> tuple[str, ...]: + def bootstrap_model_types() -> tuple[ModelType, ...]: + """Return the bootstrap models as a tuple.""" # TODO: # legacyUFV # display - return ( - ModelType.CAMERA.value, - ModelType.USER.value, - ModelType.GROUP.value, - ModelType.LIVEVIEW.value, - ModelType.VIEWPORT.value, - ModelType.LIGHT.value, - ModelType.BRIDGE.value, - ModelType.SENSOR.value, - ModelType.DOORLOCK.value, - ModelType.CHIME.value, + ModelType.CAMERA, + ModelType.USER, + ModelType.GROUP, + ModelType.LIVEVIEW, + ModelType.VIEWPORT, + ModelType.LIGHT, + ModelType.BRIDGE, + ModelType.SENSOR, + ModelType.DOORLOCK, + ModelType.CHIME, + ) + + @staticmethod + @cache + def bootstrap_models() -> tuple[str, ...]: + """Return the bootstrap models strings as a tuple.""" + return tuple( + model_type.value for model_type in ModelType.bootstrap_model_types() ) @staticmethod @cache def bootstrap_models_set() -> set[str]: + """Return the set of bootstrap models strings as a set.""" return set(ModelType.bootstrap_models()) + @staticmethod + @cache + def bootstrap_models_types_set() -> set[ModelType]: + """Return the set of bootstrap models as a set.""" + return set(ModelType.bootstrap_model_types()) + + @staticmethod + @cache + def bootstrap_models_types_and_event_set() -> set[ModelType]: + """Return the set of bootstrap models and the event model as a set.""" + return ModelType.bootstrap_models_types_set() | {ModelType.EVENT} + @enum.unique class EventType(str, ValuesEnumMixin, enum.Enum): diff --git a/src/uiprotect/data/user.py b/src/uiprotect/data/user.py index 015390b1..d0c9a26c 100644 --- a/src/uiprotect/data/user.py +++ b/src/uiprotect/data/user.py @@ -54,7 +54,7 @@ def objs(self) -> list[ProtectModelWithId] | None: if self.obj_ids == {"self"} or self.obj_ids is None: return None - devices = getattr(self._api.bootstrap, f"{self.model.value}s") + devices = getattr(self._api.bootstrap, self.model.devices_key) return [devices[oid] for oid in self.obj_ids]