Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor scheduler #1935

Merged
merged 2 commits into from
Dec 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Version 2024.12.10 (2024-12-21)

- Refactor scheduler

# Version 2024.12.9 (2024-12-20)

- Add periodic checks for device firmware updates
Expand Down
164 changes: 86 additions & 78 deletions hahomematic/central/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1540,14 +1540,34 @@ def __init__(self, central: CentralUnit) -> None:
threading.Thread.__init__(self, name=f"ConnectionChecker for {central.name}")
self._central: Final = central
self._active = True
self._central_is_connected = True
self._next_check_connection_run = datetime.now()
self._next_refresh_client_data_run = datetime.now()
self._next_refresh_program_data_run = datetime.now()
self._next_refresh_sysvar_data_run = datetime.now()
self._next_fetch_device_firmware_update_data_run = datetime.now()
self._next_fetch_device_firmware_update_data_in_delivery_run = datetime.now()
self._next_fetch_device_firmware_update_data_in_update_run = datetime.now()
self._scheduler_jobs = [
_SchedulerJob(
task=self._check_connection, run_interval=config.CONNECTION_CHECKER_INTERVAL
),
_SchedulerJob(
task=self._refresh_client_data,
run_interval=self._central.config.periodic_refresh_interval,
),
_SchedulerJob(
task=self._refresh_program_data,
run_interval=self._central.config.sys_scan_interval,
),
_SchedulerJob(
task=self._refresh_sysvar_data, run_interval=self._central.config.sys_scan_interval
),
_SchedulerJob(
task=self._fetch_device_firmware_update_data,
run_interval=DEVICE_FIRMWARE_CHECK_INTERVAL,
),
_SchedulerJob(
task=self._fetch_device_firmware_update_data_in_delivery,
run_interval=DEVICE_FIRMWARE_DELIVERING_CHECK_INTERVAL,
),
_SchedulerJob(
task=self._fetch_device_firmware_update_data_in_update,
run_interval=DEVICE_FIRMWARE_UPDATING_CHECK_INTERVAL,
),
]

def run(self) -> None:
"""Run the scheduler thread."""
Expand All @@ -1557,35 +1577,27 @@ def run(self) -> None:
)

self._central.looper.create_task(
self._run_tasks(),
self._run_scheduler_tasks(),
name="run_scheduler_tasks",
)

def stop(self) -> None:
"""To stop the ConnectionChecker."""
self._active = False

async def _run_tasks(self) -> None:
async def _run_scheduler_tasks(self) -> None:
"""Run all tasks."""
while self._active:
await self._check_connection()
if (poll_clients := self._central.poll_clients) is not None:
await self._refresh_client_data(poll_clients=poll_clients)
if self._central.config.enable_sysvar_scan:
await self._refresh_sysvar_data()
if self._central.config.enable_program_scan:
await self._refresh_program_data()
if self._central.config.enable_device_firmware_check:
await self._fetch_device_firmware_update_data()
await self._fetch_device_firmware_update_data_in_delivery()
await self._fetch_device_firmware_update_data_in_update()
for job in self._scheduler_jobs:
if not self._active or not job.ready:
continue
await job.run()
job.schedule_next_execution()
if self._active:
await asyncio.sleep(10)
await asyncio.sleep(5)

async def _check_connection(self) -> None:
"""Check connection to backend."""
if not self._active or self._next_check_connection_run > datetime.now():
return
_LOGGER.debug("CHECK_CONNECTION: Checking connection to server %s", self._central.name)
try:
if not self._central.has_all_enabled_clients:
Expand Down Expand Up @@ -1627,81 +1639,57 @@ async def _check_connection(self) -> None:
type(ex).__name__,
reduce_args(args=ex.args),
)
self._next_check_connection_run += timedelta(seconds=config.CONNECTION_CHECKER_INTERVAL)

@service(re_raise=False)
async def _refresh_client_data(self, poll_clients: tuple[hmcl.Client, ...]) -> None:
async def _refresh_client_data(self) -> None:
"""Refresh client data."""
if (
not self._active
or not self._central.available
or self._next_refresh_client_data_run > datetime.now()
):
if not self._central.available:
return
_LOGGER.debug("REFRESH_CLIENT_DATA: Checking connection to server %s", self._central.name)
for client in poll_clients:
await self._central.load_and_refresh_data_point_data(interface=client.interface)
self._central.set_last_event_dt(interface_id=client.interface_id)

self._next_refresh_client_data_run += timedelta(
seconds=self._central.config.periodic_refresh_interval
)
if (poll_clients := self._central.poll_clients) is not None and len(poll_clients) > 0:
_LOGGER.debug(
"REFRESH_CLIENT_DATA: Checking connection to server %s", self._central.name
)
for client in poll_clients:
await self._central.load_and_refresh_data_point_data(interface=client.interface)
self._central.set_last_event_dt(interface_id=client.interface_id)

@service(re_raise=False)
async def _refresh_sysvar_data(self) -> None:
"""Refresh system variables."""
if (
not self._active
or not self._central.available
or self._next_refresh_sysvar_data_run > datetime.now()
):
if not self._central.config.enable_sysvar_scan or not self._central.available:
return

_LOGGER.debug("REFRESH_SYSVAR_DATA: For %s", self._central.name)
await self._central.fetch_sysvar_data(scheduled=True)
self._next_refresh_sysvar_data_run += timedelta(
seconds=self._central.config.sys_scan_interval
)

@service(re_raise=False)
async def _refresh_program_data(self) -> None:
"""Refresh system program_data."""
if (
not self._active
or not self._central.available
or self._next_refresh_program_data_run > datetime.now()
):
if not self._central.config.enable_program_scan or not self._central.available:
return

_LOGGER.debug("REFRESH_PROGRAM_DATA: For %s", self._central.name)
await self._central.fetch_program_data(scheduled=True)
self._next_refresh_program_data_run += timedelta(
seconds=self._central.config.sys_scan_interval
)

@service(re_raise=False)
async def _fetch_device_firmware_update_data(self) -> None:
"""Periodically fetch device firmware update data from backend."""
if (
not self._active
or not self._central.available
or self._next_fetch_device_firmware_update_data_run > datetime.now()
):
if not self._central.config.enable_device_firmware_check or not self._central.available:
return

_LOGGER.debug(
"FETCH_DEVICE_FIRMWARE_UPDATE_DATA: Scheduled fetching of device firmware update data for %s",
self._central.name,
)
await self._central.refresh_firmware_data()
self._next_fetch_device_firmware_update_data_run += timedelta(
seconds=DEVICE_FIRMWARE_CHECK_INTERVAL
)

@service(re_raise=False)
async def _fetch_device_firmware_update_data_in_delivery(self) -> None:
"""Periodically fetch device firmware update data from backend."""
if (
not self._active
or not self._central.available
or self._next_fetch_device_firmware_update_data_in_delivery_run > datetime.now()
):
if not self._central.config.enable_device_firmware_check or not self._central.available:
return

_LOGGER.debug(
"FETCH_DEVICE_FIRMWARE_UPDATE_DATA_IN_DELIVERY: Scheduled fetching of device firmware update data for delivering devices for %s",
self._central.name,
Expand All @@ -1712,18 +1700,13 @@ async def _fetch_device_firmware_update_data_in_delivery(self) -> None:
DeviceFirmwareState.LIVE_DELIVER_FIRMWARE_IMAGE,
)
)
self._next_fetch_device_firmware_update_data_in_delivery_run += timedelta(
seconds=DEVICE_FIRMWARE_DELIVERING_CHECK_INTERVAL
)

@service(re_raise=False)
async def _fetch_device_firmware_update_data_in_update(self) -> None:
"""Periodically fetch device firmware update data from backend."""
if (
not self._active
or not self._central.available
or self._next_fetch_device_firmware_update_data_in_update_run > datetime.now()
):
if not self._central.config.enable_device_firmware_check or not self._central.available:
return

_LOGGER.debug(
"FETCH_DEVICE_FIRMWARE_UPDATE_DATA_IN_UPDATE: Scheduled fetching of device firmware update data for updating devices for %s",
self._central.name,
Expand All @@ -1735,9 +1718,34 @@ async def _fetch_device_firmware_update_data_in_update(self) -> None:
DeviceFirmwareState.PERFORMING_UPDATE,
)
)
self._next_fetch_device_firmware_update_data_in_update_run += timedelta(
seconds=DEVICE_FIRMWARE_UPDATING_CHECK_INTERVAL
)


class _SchedulerJob:
"""Job to run in the scheduler."""

def __init__(
self,
task: Callable,
run_interval: int,
next_run: datetime = datetime.now(),
):
"""Init the job."""
self._task: Final = task
self._next_run = next_run
self._run_interval: Final = run_interval

@property
def ready(self) -> bool:
"""Return if the job can be executed."""
return self._next_run < datetime.now()

async def run(self) -> None:
"""Run the task."""
await self._task()

def schedule_next_execution(self) -> None:
"""Schedule the next execution of the job."""
self._next_run += timedelta(seconds=self._run_interval)


class CentralConfig:
Expand Down
2 changes: 1 addition & 1 deletion hahomematic/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import re
from typing import Any, Final, Required, TypedDict

VERSION: Final = "2024.12.9"
VERSION: Final = "2024.12.10"

DEFAULT_CONNECTION_CHECKER_INTERVAL: Final = 15 # check if connection is available via rpc ping
DEFAULT_CUSTOM_ID: Final = "custom_id"
Expand Down
Loading