diff --git a/deebot_client/events/event_bus.py b/deebot_client/events/event_bus.py index 17f4ae04..3ed6283b 100644 --- a/deebot_client/events/event_bus.py +++ b/deebot_client/events/event_bus.py @@ -28,6 +28,7 @@ def __init__(self) -> None: ] = [] self.semaphore: Final = asyncio.Semaphore(1) self.last_event: T | None = None + self.notify_handle: asyncio.TimerHandle | None = None class EventBus: @@ -73,43 +74,56 @@ def unsubscribe() -> None: return unsubscribe - def notify(self, event: T) -> bool: + def notify(self, event: T, *, debounce_time: float = 0) -> None: """Notify subscriber with given event representation.""" event_processing_data = self._get_or_create_event_processing_data(type(event)) if ( - isinstance(event, StateEvent) - and event.state == VacuumState.IDLE - and event_processing_data.last_event - and event_processing_data.last_event.state == VacuumState.DOCKED # type: ignore[attr-defined] - ): - # todo distinguish better between docked and idle and outside event bus. # pylint: disable=fixme - # Problem getCleanInfo will return state=idle, when bot is charging - event = StateEvent(VacuumState.DOCKED) # type: ignore[assignment] - elif ( - isinstance(event, AvailabilityEvent) - and event.available - and event_processing_data.last_event - and not event_processing_data.last_event.available # type: ignore[attr-defined] - ): - # unavailable -> available: refresh everything - for event_type, _ in self._event_processing_dict.items(): - if event_type != AvailabilityEvent: - self.request_refresh(event_type) - - if event == event_processing_data.last_event: - _LOGGER.debug("Event is the same! Skipping (%s)", event) - return False - - event_processing_data.last_event = event - if event_processing_data.subscriber_callbacks: - _LOGGER.debug("Notify subscribers with %s", event) - for callback in event_processing_data.subscriber_callbacks: - create_task(self._tasks, callback(event)) - return True - - _LOGGER.debug("No subscribers... Discharging %s", event) - return False + handle := event_processing_data.notify_handle + ) is not None and not handle.cancelled(): + handle.cancel() + + def _notify(event: T) -> None: + event_processing_data.notify_handle = None + + if ( + isinstance(event, StateEvent) + and event.state == VacuumState.IDLE + and event_processing_data.last_event + and event_processing_data.last_event.state == VacuumState.DOCKED # type: ignore[attr-defined] + ): + # todo distinguish better between docked and idle and outside event bus. # pylint: disable=fixme + # Problem getCleanInfo will return state=idle, when bot is charging + event = StateEvent(VacuumState.DOCKED) # type: ignore[assignment] + elif ( + isinstance(event, AvailabilityEvent) + and event.available + and event_processing_data.last_event + and not event_processing_data.last_event.available # type: ignore[attr-defined] + ): + # unavailable -> available: refresh everything + for event_type, _ in self._event_processing_dict.items(): + if event_type != AvailabilityEvent: + self.request_refresh(event_type) + + if event == event_processing_data.last_event: + _LOGGER.debug("Event is the same! Skipping (%s)", event) + return + + event_processing_data.last_event = event + if event_processing_data.subscriber_callbacks: + _LOGGER.debug("Notify subscribers with %s", event) + for callback in event_processing_data.subscriber_callbacks: + create_task(self._tasks, callback(event)) + else: + _LOGGER.debug("No subscribers... Discharging %s", event) + + if debounce_time > 0: + event_processing_data.notify_handle = asyncio.get_running_loop().call_later( + debounce_time, _notify, event + ) + else: + _notify(event) def request_refresh(self, event_class: type[T]) -> None: """Request manual refresh.""" @@ -119,6 +133,9 @@ def request_refresh(self, event_class: type[T]) -> None: async def teardown(self) -> None: """Teardown eventbus.""" await cancel(self._tasks) + for data in self._event_processing_dict.values(): + if handle := data.notify_handle: + handle.cancel() async def _call_refresh_function(self, event_class: type[T]) -> None: semaphore = self._event_processing_dict[event_class].semaphore diff --git a/deebot_client/events/map.py b/deebot_client/events/map.py index c8bb40db..e3e5f8ea 100644 --- a/deebot_client/events/map.py +++ b/deebot_client/events/map.py @@ -102,4 +102,4 @@ class CachedMapInfoEvent(Event): class MapChangedEvent(Event): """Map changed event.""" - when: datetime = datetime.utcnow() + when: datetime diff --git a/deebot_client/map.py b/deebot_client/map.py index 4149ea5c..088f47ab 100644 --- a/deebot_client/map.py +++ b/deebot_client/map.py @@ -8,7 +8,7 @@ import struct import zlib from collections.abc import Callable, Coroutine -from datetime import datetime, timedelta +from datetime import datetime, timezone from io import BytesIO from typing import Any, Final @@ -539,11 +539,9 @@ def __init__(self, event_bus: EventBus) -> None: def on_change() -> None: self._changed = True - now = datetime.utcnow() - last_event = event_bus.get_last_event(MapChangedEvent) - if last_event is None or (now - last_event.when) > timedelta(seconds=1): - # throttle notify to ones a second - event_bus.notify(MapChangedEvent(now)) + event_bus.notify( + MapChangedEvent(datetime.now(timezone.utc)), debounce_time=1 + ) self._on_change = on_change self._map_pieces: OnChangedList[MapPiece] = OnChangedList( diff --git a/tests/events/test_event_bus.py b/tests/events/test_event_bus.py index 36a83235..3dc5f048 100644 --- a/tests/events/test_event_bus.py +++ b/tests/events/test_event_bus.py @@ -1,6 +1,7 @@ import asyncio from collections.abc import Callable -from unittest.mock import AsyncMock, call +from datetime import datetime, timezone +from unittest.mock import AsyncMock, call, patch import pytest @@ -8,6 +9,7 @@ from deebot_client.events.base import Event from deebot_client.events.const import EVENT_DTO_REFRESH_COMMANDS from deebot_client.events.event_bus import EventBus +from deebot_client.events.map import MapChangedEvent from deebot_client.models import VacuumState @@ -146,3 +148,68 @@ async def notify(state: VacuumState) -> None: mock.assert_called_once_with(StateEvent(expected)) else: assert event_bus.get_last_event(StateEvent) == StateEvent(last) + + +@pytest.mark.parametrize( + "debounce_time", + [-1, 0, 1], +) +async def test_debounce_time(event_bus: EventBus, debounce_time: float) -> None: + async def notify(event: MapChangedEvent, debounce_time: float) -> None: + event_bus.notify(event, debounce_time=debounce_time) + await asyncio.sleep(0.1) + + mock = AsyncMock() + event_bus.subscribe(MapChangedEvent, mock) + + with patch("deebot_client.events.event_bus.asyncio", wraps=asyncio) as aio: + + async def test_cycle() -> MapChangedEvent: + event = MapChangedEvent(datetime.now(timezone.utc)) + await notify(event, debounce_time) + if debounce_time > 0: + aio.get_running_loop.assert_called() + mock.assert_not_called() + else: + aio.get_running_loop.assert_not_called() + mock.assert_called_once_with(event) + mock.reset_mock() + + return event + + for _ in range(2): + await test_cycle() + event = await test_cycle() + + if debounce_time > 0: + await asyncio.sleep(debounce_time) + mock.assert_called_once_with(event) + mock.reset_mock() + + +async def test_teardown(event_bus: EventBus, execute_mock: AsyncMock) -> None: + # setup + async def wait() -> None: + await asyncio.sleep(1000) + + execute_mock.side_effect = wait + + mock = AsyncMock() + event_bus.subscribe(BatteryEvent, mock) + + event_bus.notify(BatteryEvent(100), debounce_time=10000) + event_bus.request_refresh(BatteryEvent) + + # verify tasks/handle still running + handle = event_bus._event_processing_dict[BatteryEvent].notify_handle + assert handle is not None + assert handle.cancelled() is False + assert len(event_bus._tasks) > 0 + + # test + await event_bus.teardown() + + # verify + assert handle is not None + assert handle.cancelled() is True + assert len(event_bus._tasks) == 0 diff --git a/tests/test_map.py b/tests/test_map.py index f0b990ec..5f81208e 100644 --- a/tests/test_map.py +++ b/tests/test_map.py @@ -1,8 +1,9 @@ -from time import sleep -from unittest.mock import Mock +import asyncio +from unittest.mock import AsyncMock import pytest +from deebot_client.events.event_bus import EventBus from deebot_client.events.map import MapChangedEvent, Position, PositionType from deebot_client.map import MapData, _calc_point from deebot_client.models import Room @@ -25,33 +26,26 @@ def test_calc_point( assert result == expected -def test_MapData() -> None: - last_event: MapChangedEvent | None = None - - def get_last_event(_: type[MapChangedEvent]) -> MapChangedEvent | None: - nonlocal last_event - return last_event - - def notify(event: MapChangedEvent) -> None: - nonlocal last_event - last_event = event - - event_bus = Mock() - event_bus.get_last_event.side_effect = get_last_event - event_bus.notify.side_effect = notify +async def test_MapData(event_bus: EventBus) -> None: + mock = AsyncMock() + event_bus.subscribe(MapChangedEvent, mock) map_data = MapData(event_bus) - def test_cycle() -> None: - for x in range(4): + async def test_cycle() -> None: + for x in range(10000): map_data.positions.append(Position(PositionType.DEEBOT, x, x)) map_data.rooms[x] = Room("test", x, "1,2") - assert event_bus.notify.call_count == 1 + assert map_data.changed is True + mock.assert_not_called() + + await asyncio.sleep(1.1) + mock.assert_called_once() - test_cycle() + await test_cycle() - event_bus.reset_mock() - sleep(1.1) + mock.reset_mock() + map_data.reset_changed() - test_cycle() + await test_cycle()