Skip to content

Commit

Permalink
fix: make creation of update sync primitives lazy (#111)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco authored Jun 23, 2024
1 parent b4477ab commit b05af57
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 42 deletions.
66 changes: 37 additions & 29 deletions src/uiprotect/data/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import logging
from collections.abc import Callable
from datetime import datetime, timedelta
from functools import cache
from functools import cache, cached_property
from ipaddress import IPv4Address
from typing import TYPE_CHECKING, Any, ClassVar, TypeVar
from uuid import UUID
Expand Down Expand Up @@ -536,32 +536,40 @@ def unifi_dict(
return data


class UpdateSynchronization:
"""Helper class for managing updates to Protect devices."""

@cached_property
def lock(self) -> asyncio.Lock:
"""Lock to prevent multiple updates at once."""
return asyncio.Lock()

@cached_property
def queue(self) -> asyncio.Queue[Callable[[], None]]:
"""Queue to store device updates."""
return asyncio.Queue()

@cached_property
def event(self) -> asyncio.Event:
"""Event to signal when a device update has been queued."""
return asyncio.Event()


class ProtectModelWithId(ProtectModel):
id: str

_update_lock: asyncio.Lock = PrivateAttr(None)
_update_queue: asyncio.Queue[Callable[[], None]] = PrivateAttr(None)
_update_event: asyncio.Event = PrivateAttr(None)
_update_sync: UpdateSynchronization = PrivateAttr(None)

def __init__(self, **data: Any) -> None:
update_lock = data.pop("update_lock", None)
update_queue = data.pop("update_queue", None)
update_event = data.pop("update_event", None)
update_sync = data.pop("update_sync", None)
super().__init__(**data)
self._update_lock = update_lock or asyncio.Lock()
self._update_queue = update_queue or asyncio.Queue()
self._update_event = update_event or asyncio.Event()
self._update_sync = update_sync or UpdateSynchronization()

@classmethod
def construct(cls, _fields_set: set[str] | None = None, **values: Any) -> Self:
update_lock = values.pop("update_lock", None)
update_queue = values.pop("update_queue", None)
update_event = values.pop("update_event", None)
update_sync = values.pop("update_sync", None)
obj = super().construct(_fields_set=_fields_set, **values)
obj._update_lock = update_lock or asyncio.Lock()
obj._update_queue = update_queue or asyncio.Queue()
obj._update_event = update_event or asyncio.Event()

obj._update_sync = update_sync or UpdateSynchronization()
return obj

@classmethod
Expand Down Expand Up @@ -609,28 +617,28 @@ async def queue_update(self, callback: Callable[[], None]) -> None:
This allows aggregating devices updates so if multiple ones come in all at once,
they can be combined in a single PATCH.
"""
self._update_queue.put_nowait(callback)
self._update_sync.queue.put_nowait(callback)

self._update_event.set()
self._update_sync.event.set()
await asyncio.sleep(
0.001,
) # release execution so other `queue_update` calls can abort
self._update_event.clear()
self._update_sync.event.clear()

try:
async with asyncio_timeout(0.05):
await self._update_event.wait()
self._update_event.clear()
await self._update_sync.event.wait()
self._update_sync.event.clear()
return
except (TimeoutError, asyncio.TimeoutError, asyncio.CancelledError):
async with self._update_lock:
async with self._update_sync.lock:
# Important! Now that we have the lock, we yield to the event loop so any
# updates from the websocket are processed before we generate the diff
await asyncio.sleep(0)
# Save the initial data before we generate the diff
data_before_changes = self.dict_with_excludes()
while not self._update_queue.empty():
callback = self._update_queue.get_nowait()
while not self._update_sync.queue.empty():
callback = self._update_sync.queue.get_nowait()
callback()
# Important, do not yield to the event loop before generating the diff
# otherwise we may miss updates from the websocket
Expand Down Expand Up @@ -660,8 +668,8 @@ async def save_device(
"""
# do not allow multiple save_device calls at once
release_lock = False
if not self._update_lock.locked():
await self._update_lock.acquire()
if not self._update_sync.lock.locked():
await self._update_sync.lock.acquire()
release_lock = True

try:
Expand All @@ -673,7 +681,7 @@ async def save_device(
)
finally:
if release_lock:
self._update_lock.release()
self._update_sync.lock.release()

async def _save_device_changes(
self,
Expand All @@ -692,7 +700,7 @@ async def _save_device_changes(
)

assert (
self._update_lock.locked()
self._update_sync.lock.locked()
), "save_device_changes should only be called when the update lock is held"
read_only_fields = self.__class__._get_read_only_fields()

Expand Down
21 changes: 9 additions & 12 deletions src/uiprotect/data/devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,9 @@ def camera(self) -> Camera | None:

async def set_paired_camera(self, camera: Camera | None) -> None:
"""Sets the camera paired with the light"""
async with self._update_lock:
await asyncio.sleep(
0,
) # yield to the event loop once we have the lock to process any pending updates
async with self._update_sync.lock:
# yield to the event loop once we have the lock to process any pending updates
await asyncio.sleep(0)
data_before_changes = self.dict_with_excludes()
if camera is None:
self.camera_id = None
Expand Down Expand Up @@ -2378,10 +2377,9 @@ async def set_lcd_text(
raise BadRequest("Camera does not have an LCD screen")

if text_type is None:
async with self._update_lock:
await asyncio.sleep(
0,
) # yield to the event loop once we have the lock to process any pending updates
async with self._update_sync.lock:
# yield to the event loop once we have the lock to process any pending updates
await asyncio.sleep(0)
data_before_changes = self.dict_with_excludes()
self.lcd_message = None
# UniFi Protect bug: clearing LCD text message does _not_ emit a WS message
Expand Down Expand Up @@ -2704,10 +2702,9 @@ async def set_liveview(self, liveview: Liveview) -> None:
if self._api is not None and liveview.id not in self._api.bootstrap.liveviews:
raise BadRequest("Unknown liveview")

async with self._update_lock:
await asyncio.sleep(
0,
) # yield to the event loop once we have the lock to process any pending updates
async with self._update_sync.lock:
# yield to the event loop once we have the lock to process any pending updates
await asyncio.sleep(0)
data_before_changes = self.dict_with_excludes()
self.liveview_id = liveview.id
# UniFi Protect bug: changing the liveview does _not_ emit a WS message
Expand Down
2 changes: 1 addition & 1 deletion src/uiprotect/data/nvr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ async def _update_doorbell_messages(
self, update_callback: Callable[[], None]
) -> None:
"""Updates doorbell messages and saves to Protect."""
async with self._update_lock:
async with self._update_sync.lock:
# yield to the event loop once we have the lock to ensure websocket updates are processed
await asyncio.sleep(0)
data_before_changes = self.dict_with_excludes()
Expand Down

0 comments on commit b05af57

Please sign in to comment.