From 84fd1a02ebc2cf72f8181ebbbc4c03a5f46d7989 Mon Sep 17 00:00:00 2001 From: Mike Degatano Date: Thu, 5 Oct 2023 15:23:49 -0400 Subject: [PATCH] Unstarted jobs should always be cleaned up --- supervisor/jobs/__init__.py | 4 +- supervisor/jobs/decorator.py | 192 +++++++++++++++++-------------- tests/jobs/test_job_decorator.py | 58 ++++++++++ 3 files changed, 165 insertions(+), 89 deletions(-) diff --git a/supervisor/jobs/__init__.py b/supervisor/jobs/__init__.py index b987c1a7de8..48ab198f8a0 100644 --- a/supervisor/jobs/__init__.py +++ b/supervisor/jobs/__init__.py @@ -86,7 +86,7 @@ def as_dict(self) -> dict[str, Any]: } @contextmanager - def start(self, *, on_done: Callable[["SupervisorJob"], None] | None = None): + def start(self): """Start the job in the current task. This can only be called if the parent ID matches the job running in the current task. @@ -107,8 +107,6 @@ def start(self, *, on_done: Callable[["SupervisorJob"], None] | None = None): self.done = True if token: _CURRENT_JOB.reset(token) - if on_done: - on_done(self) class JobManager(FileConfiguration, CoreSysAttributes): diff --git a/supervisor/jobs/decorator.py b/supervisor/jobs/decorator.py index 0633ee11f41..050559118fa 100644 --- a/supervisor/jobs/decorator.py +++ b/supervisor/jobs/decorator.py @@ -201,95 +201,115 @@ async def wrapper(obj: JobGroup | CoreSysAttributes, *args, **kwargs) -> Any: internal=self._internal, ) - # Handle condition - if self.conditions: - try: - await Job.check_conditions( - self, set(self.conditions), self._method.__qualname__ - ) - except JobConditionException as err: - return self._handle_job_condition_exception(err) - - # Handle exection limits - if self.limit in (JobExecutionLimit.SINGLE_WAIT, JobExecutionLimit.ONCE): - await self._acquire_exection_limit() - elif self.limit in ( - JobExecutionLimit.GROUP_ONCE, - JobExecutionLimit.GROUP_WAIT, - ): - try: - await obj.acquire(job, self.limit == JobExecutionLimit.GROUP_WAIT) - except JobGroupExecutionLimitExceeded as err: - if self.on_condition: - raise self.on_condition(str(err)) from err - raise err - elif self.limit in ( - JobExecutionLimit.THROTTLE, - JobExecutionLimit.GROUP_THROTTLE, - ): - time_since_last_call = datetime.now() - self.last_call(group_name) - if time_since_last_call < self.throttle_period(group_name): - return - elif self.limit in ( - JobExecutionLimit.THROTTLE_WAIT, - JobExecutionLimit.GROUP_THROTTLE_WAIT, - ): - await self._acquire_exection_limit() - time_since_last_call = datetime.now() - self.last_call(group_name) - if time_since_last_call < self.throttle_period(group_name): - self._release_exception_limits() - return - elif self.limit in ( - JobExecutionLimit.THROTTLE_RATE_LIMIT, - JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT, - ): - # Only reprocess array when necessary (at limit) - if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls: - self.set_rate_limited_calls( - [ - call - for call in self.rate_limited_calls(group_name) - if call > datetime.now() - self.throttle_period(group_name) - ], - group_name, - ) - - if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls: - on_condition = ( - JobException if self.on_condition is None else self.on_condition - ) - raise on_condition( - f"Rate limit exceeded, more then {self.throttle_max_calls} calls in {self.throttle_period(group_name)}", - ) - - # Execute Job - with job.start(on_done=self.sys_jobs.remove_job if self.cleanup else None): - try: - self.set_last_call(datetime.now(), group_name) - if self.rate_limited_calls(group_name) is not None: - self.add_rate_limited_call( - self.last_call(group_name), group_name + try: + # Handle condition + if self.conditions: + try: + await Job.check_conditions( + self, set(self.conditions), self._method.__qualname__ + ) + except JobConditionException as err: + return self._handle_job_condition_exception(err) + + # Handle exection limits + if self.limit in ( + JobExecutionLimit.SINGLE_WAIT, + JobExecutionLimit.ONCE, + ): + await self._acquire_exection_limit() + elif self.limit in ( + JobExecutionLimit.GROUP_ONCE, + JobExecutionLimit.GROUP_WAIT, + ): + try: + await obj.acquire( + job, self.limit == JobExecutionLimit.GROUP_WAIT + ) + except JobGroupExecutionLimitExceeded as err: + if self.on_condition: + raise self.on_condition(str(err)) from err + raise err + elif self.limit in ( + JobExecutionLimit.THROTTLE, + JobExecutionLimit.GROUP_THROTTLE, + ): + time_since_last_call = datetime.now() - self.last_call(group_name) + if time_since_last_call < self.throttle_period(group_name): + return + elif self.limit in ( + JobExecutionLimit.THROTTLE_WAIT, + JobExecutionLimit.GROUP_THROTTLE_WAIT, + ): + await self._acquire_exection_limit() + time_since_last_call = datetime.now() - self.last_call(group_name) + if time_since_last_call < self.throttle_period(group_name): + self._release_exception_limits() + return + elif self.limit in ( + JobExecutionLimit.THROTTLE_RATE_LIMIT, + JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT, + ): + # Only reprocess array when necessary (at limit) + if ( + len(self.rate_limited_calls(group_name)) + >= self.throttle_max_calls + ): + self.set_rate_limited_calls( + [ + call + for call in self.rate_limited_calls(group_name) + if call + > datetime.now() - self.throttle_period(group_name) + ], + group_name, ) - return await self._method(obj, *args, **kwargs) - - # If a method has a conditional JobCondition, they must check it in the method - # These should be handled like normal JobConditions as much as possible - except JobConditionException as err: - return self._handle_job_condition_exception(err) - except HassioError as err: - raise err - except Exception as err: - _LOGGER.exception("Unhandled exception: %s", err) - capture_exception(err) - raise JobException() from err - finally: - self._release_exception_limits() - if self.limit in ( - JobExecutionLimit.GROUP_ONCE, - JobExecutionLimit.GROUP_WAIT, + if ( + len(self.rate_limited_calls(group_name)) + >= self.throttle_max_calls ): - obj.release() + on_condition = ( + JobException + if self.on_condition is None + else self.on_condition + ) + raise on_condition( + f"Rate limit exceeded, more then {self.throttle_max_calls} calls in {self.throttle_period(group_name)}", + ) + + # Execute Job + with job.start(): + try: + self.set_last_call(datetime.now(), group_name) + if self.rate_limited_calls(group_name) is not None: + self.add_rate_limited_call( + self.last_call(group_name), group_name + ) + + return await self._method(obj, *args, **kwargs) + + # If a method has a conditional JobCondition, they must check it in the method + # These should be handled like normal JobConditions as much as possible + except JobConditionException as err: + return self._handle_job_condition_exception(err) + except HassioError as err: + raise err + except Exception as err: + _LOGGER.exception("Unhandled exception: %s", err) + capture_exception(err) + raise JobException() from err + finally: + self._release_exception_limits() + if self.limit in ( + JobExecutionLimit.GROUP_ONCE, + JobExecutionLimit.GROUP_WAIT, + ): + obj.release() + + # Jobs that weren't started are always cleaned up. Also clean up done jobs if required + finally: + if job.done is None or self.cleanup: + self.sys_jobs.remove_job(job) return wrapper diff --git a/tests/jobs/test_job_decorator.py b/tests/jobs/test_job_decorator.py index ffa5e73d0cd..85382d24506 100644 --- a/tests/jobs/test_job_decorator.py +++ b/tests/jobs/test_job_decorator.py @@ -1044,3 +1044,61 @@ async def job_await_inner(self): await test.job_await() await test.job_release() await task + + +async def test_job_always_removed_on_check_failure(coresys: CoreSys): + """Test that the job instance is always removed if the condition or limit check fails.""" + + class TestClass: + """Test class.""" + + event = asyncio.Event() + limit_job: Job | None = None + + def __init__(self, coresys: CoreSys) -> None: + """Initialize object.""" + self.coresys = coresys + + @Job( + name="test_job_always_removed_on_check_failure_condition", + conditions=[JobCondition.HAOS], + on_condition=JobException, + cleanup=False, + ) + async def condition_check(self): + """Job that will fail a condition check.""" + raise AssertionError("should not run") + + @Job( + name="test_job_always_removed_on_check_failure_limit", + limit=JobExecutionLimit.ONCE, + cleanup=False, + ) + async def limit_check(self): + """Job that can fail a limit check.""" + self.limit_job = self.coresys.jobs.current + await self.event.wait() + + def release_limit_check(self): + """Release the limit check job.""" + self.event.set() + + test = TestClass(coresys) + + with pytest.raises(JobException): + await test.condition_check() + assert coresys.jobs.jobs == [] + + task = coresys.create_task(test.limit_check()) + await asyncio.sleep(0) + assert (job := test.limit_job) + + with pytest.raises(JobException): + await test.limit_check() + assert test.limit_job == job + assert coresys.jobs.jobs == [job] + + test.release_limit_check() + await task + assert job.done + assert coresys.jobs.jobs == [job]