Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MapChangedEvent needs debounce and not throttle #284

Merged
merged 3 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 50 additions & 33 deletions deebot_client/events/event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
] = []
self.semaphore: Final = asyncio.Semaphore(1)
self.last_event: T | None = None
self.notify_handle: asyncio.TimerHandle | None = None


class EventBus:
Expand Down Expand Up @@ -73,43 +74,56 @@

return unsubscribe

def notify(self, event: T) -> bool:
def notify(self, event: T, *, debounce_time: float = 0) -> None:
"""Notify subscriber with given event representation."""
event_processing_data = self._get_or_create_event_processing_data(type(event))

if (
isinstance(event, StateEvent)
and event.state == VacuumState.IDLE
and event_processing_data.last_event
and event_processing_data.last_event.state == VacuumState.DOCKED # type: ignore[attr-defined]
):
# todo distinguish better between docked and idle and outside event bus. # pylint: disable=fixme
# Problem getCleanInfo will return state=idle, when bot is charging
event = StateEvent(VacuumState.DOCKED) # type: ignore[assignment]
elif (
isinstance(event, AvailabilityEvent)
and event.available
and event_processing_data.last_event
and not event_processing_data.last_event.available # type: ignore[attr-defined]
):
# unavailable -> available: refresh everything
for event_type, _ in self._event_processing_dict.items():
if event_type != AvailabilityEvent:
self.request_refresh(event_type)

if event == event_processing_data.last_event:
_LOGGER.debug("Event is the same! Skipping (%s)", event)
return False

event_processing_data.last_event = event
if event_processing_data.subscriber_callbacks:
_LOGGER.debug("Notify subscribers with %s", event)
for callback in event_processing_data.subscriber_callbacks:
create_task(self._tasks, callback(event))
return True

_LOGGER.debug("No subscribers... Discharging %s", event)
return False
handle := event_processing_data.notify_handle
) is not None and not handle.cancelled():
handle.cancel()

def _notify(event: T) -> None:
event_processing_data.notify_handle = None

if (
isinstance(event, StateEvent)
and event.state == VacuumState.IDLE
and event_processing_data.last_event
and event_processing_data.last_event.state == VacuumState.DOCKED # type: ignore[attr-defined]
):
# todo distinguish better between docked and idle and outside event bus. # pylint: disable=fixme
# Problem getCleanInfo will return state=idle, when bot is charging
event = StateEvent(VacuumState.DOCKED) # type: ignore[assignment]
elif (
isinstance(event, AvailabilityEvent)
and event.available
and event_processing_data.last_event
and not event_processing_data.last_event.available # type: ignore[attr-defined]
):
# unavailable -> available: refresh everything
for event_type, _ in self._event_processing_dict.items():
if event_type != AvailabilityEvent:
self.request_refresh(event_type)

if event == event_processing_data.last_event:
_LOGGER.debug("Event is the same! Skipping (%s)", event)
return

event_processing_data.last_event = event
if event_processing_data.subscriber_callbacks:
_LOGGER.debug("Notify subscribers with %s", event)
for callback in event_processing_data.subscriber_callbacks:
create_task(self._tasks, callback(event))
else:
_LOGGER.debug("No subscribers... Discharging %s", event)

if debounce_time > 0:
event_processing_data.notify_handle = asyncio.get_running_loop().call_later(
debounce_time, _notify, event
)
else:
_notify(event)

def request_refresh(self, event_class: type[T]) -> None:
"""Request manual refresh."""
Expand All @@ -119,6 +133,9 @@
async def teardown(self) -> None:
"""Teardown eventbus."""
await cancel(self._tasks)
for data in self._event_processing_dict.values():
if handle := data.notify_handle:
handle.cancel()

Check warning on line 138 in deebot_client/events/event_bus.py

View check run for this annotation

Codecov / codecov/patch

deebot_client/events/event_bus.py#L138

Added line #L138 was not covered by tests

async def _call_refresh_function(self, event_class: type[T]) -> None:
semaphore = self._event_processing_dict[event_class].semaphore
Expand Down
2 changes: 1 addition & 1 deletion deebot_client/events/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,4 @@ class CachedMapInfoEvent(Event):
class MapChangedEvent(Event):
"""Map changed event."""

when: datetime = datetime.utcnow()
when: datetime
10 changes: 4 additions & 6 deletions deebot_client/map.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import struct
import zlib
from collections.abc import Callable, Coroutine
from datetime import datetime, timedelta
from datetime import datetime, timezone
from io import BytesIO
from typing import Any, Final

Expand Down Expand Up @@ -539,11 +539,9 @@ def __init__(self, event_bus: EventBus) -> None:

def on_change() -> None:
self._changed = True
now = datetime.utcnow()
last_event = event_bus.get_last_event(MapChangedEvent)
if last_event is None or (now - last_event.when) > timedelta(seconds=1):
# throttle notify to ones a second
event_bus.notify(MapChangedEvent(now))
event_bus.notify(
MapChangedEvent(datetime.now(timezone.utc)), debounce_time=1
)

self._on_change = on_change
self._map_pieces: OnChangedList[MapPiece] = OnChangedList(
Expand Down
69 changes: 68 additions & 1 deletion tests/events/test_event_bus.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import asyncio
from collections.abc import Callable
from unittest.mock import AsyncMock, call
from datetime import datetime, timezone
from unittest.mock import AsyncMock, call, patch

import pytest

from deebot_client.events import AvailabilityEvent, BatteryEvent, StateEvent
from deebot_client.events.base import Event
from deebot_client.events.const import EVENT_DTO_REFRESH_COMMANDS
from deebot_client.events.event_bus import EventBus
from deebot_client.events.map import MapChangedEvent
from deebot_client.models import VacuumState


Expand Down Expand Up @@ -146,3 +148,68 @@ async def notify(state: VacuumState) -> None:
mock.assert_called_once_with(StateEvent(expected))
else:
assert event_bus.get_last_event(StateEvent) == StateEvent(last)


@pytest.mark.parametrize(
"debounce_time",
[-1, 0, 1],
)
async def test_debounce_time(event_bus: EventBus, debounce_time: float) -> None:
async def notify(event: MapChangedEvent, debounce_time: float) -> None:
event_bus.notify(event, debounce_time=debounce_time)
await asyncio.sleep(0.1)

mock = AsyncMock()
event_bus.subscribe(MapChangedEvent, mock)

with patch("deebot_client.events.event_bus.asyncio", wraps=asyncio) as aio:

async def test_cycle() -> MapChangedEvent:
event = MapChangedEvent(datetime.now(timezone.utc))
await notify(event, debounce_time)
if debounce_time > 0:
aio.get_running_loop.assert_called()
mock.assert_not_called()
else:
aio.get_running_loop.assert_not_called()
mock.assert_called_once_with(event)
mock.reset_mock()

return event

for _ in range(2):
await test_cycle()
event = await test_cycle()

if debounce_time > 0:
await asyncio.sleep(debounce_time)
mock.assert_called_once_with(event)
mock.reset_mock()


async def test_teardown(event_bus: EventBus, execute_mock: AsyncMock) -> None:
# setup
async def wait() -> None:
await asyncio.sleep(1000)

execute_mock.side_effect = wait

mock = AsyncMock()
event_bus.subscribe(BatteryEvent, mock)

event_bus.notify(BatteryEvent(100), debounce_time=10000)
event_bus.request_refresh(BatteryEvent)

# verify tasks/handle still running
handle = event_bus._event_processing_dict[BatteryEvent].notify_handle
assert handle is not None
assert handle.cancelled() is False
assert len(event_bus._tasks) > 0

# test
await event_bus.teardown()

# verify
assert handle is not None
assert handle.cancelled() is True
assert len(event_bus._tasks) == 0
40 changes: 17 additions & 23 deletions tests/test_map.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from time import sleep
from unittest.mock import Mock
import asyncio
from unittest.mock import AsyncMock

import pytest

from deebot_client.events.event_bus import EventBus
from deebot_client.events.map import MapChangedEvent, Position, PositionType
from deebot_client.map import MapData, _calc_point
from deebot_client.models import Room
Expand All @@ -25,33 +26,26 @@ def test_calc_point(
assert result == expected


def test_MapData() -> None:
last_event: MapChangedEvent | None = None

def get_last_event(_: type[MapChangedEvent]) -> MapChangedEvent | None:
nonlocal last_event
return last_event

def notify(event: MapChangedEvent) -> None:
nonlocal last_event
last_event = event

event_bus = Mock()
event_bus.get_last_event.side_effect = get_last_event
event_bus.notify.side_effect = notify
async def test_MapData(event_bus: EventBus) -> None:
mock = AsyncMock()
event_bus.subscribe(MapChangedEvent, mock)

map_data = MapData(event_bus)

def test_cycle() -> None:
for x in range(4):
async def test_cycle() -> None:
for x in range(10000):
map_data.positions.append(Position(PositionType.DEEBOT, x, x))
map_data.rooms[x] = Room("test", x, "1,2")

assert event_bus.notify.call_count == 1
assert map_data.changed is True
mock.assert_not_called()

await asyncio.sleep(1.1)
mock.assert_called_once()

test_cycle()
await test_cycle()

event_bus.reset_mock()
sleep(1.1)
mock.reset_mock()
map_data.reset_changed()

test_cycle()
await test_cycle()