Skip to content

Commit

Permalink
Unstarted jobs should always be cleaned up (#4604)
Browse files Browse the repository at this point in the history
  • Loading branch information
mdegat01 authored Oct 9, 2023
1 parent f984030 commit ace58ba
Show file tree
Hide file tree
Showing 3 changed files with 165 additions and 89 deletions.
4 changes: 1 addition & 3 deletions supervisor/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def as_dict(self) -> dict[str, Any]:
}

@contextmanager
def start(self, *, on_done: Callable[["SupervisorJob"], None] | None = None):
def start(self):
"""Start the job in the current task.
This can only be called if the parent ID matches the job running in the current task.
Expand All @@ -107,8 +107,6 @@ def start(self, *, on_done: Callable[["SupervisorJob"], None] | None = None):
self.done = True
if token:
_CURRENT_JOB.reset(token)
if on_done:
on_done(self)


class JobManager(FileConfiguration, CoreSysAttributes):
Expand Down
192 changes: 106 additions & 86 deletions supervisor/jobs/decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,95 +201,115 @@ async def wrapper(obj: JobGroup | CoreSysAttributes, *args, **kwargs) -> Any:
internal=self._internal,
)

# Handle condition
if self.conditions:
try:
await Job.check_conditions(
self, set(self.conditions), self._method.__qualname__
)
except JobConditionException as err:
return self._handle_job_condition_exception(err)

# Handle exection limits
if self.limit in (JobExecutionLimit.SINGLE_WAIT, JobExecutionLimit.ONCE):
await self._acquire_exection_limit()
elif self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
):
try:
await obj.acquire(job, self.limit == JobExecutionLimit.GROUP_WAIT)
except JobGroupExecutionLimitExceeded as err:
if self.on_condition:
raise self.on_condition(str(err)) from err
raise err
elif self.limit in (
JobExecutionLimit.THROTTLE,
JobExecutionLimit.GROUP_THROTTLE,
):
time_since_last_call = datetime.now() - self.last_call(group_name)
if time_since_last_call < self.throttle_period(group_name):
return
elif self.limit in (
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
):
await self._acquire_exection_limit()
time_since_last_call = datetime.now() - self.last_call(group_name)
if time_since_last_call < self.throttle_period(group_name):
self._release_exception_limits()
return
elif self.limit in (
JobExecutionLimit.THROTTLE_RATE_LIMIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
):
# Only reprocess array when necessary (at limit)
if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls:
self.set_rate_limited_calls(
[
call
for call in self.rate_limited_calls(group_name)
if call > datetime.now() - self.throttle_period(group_name)
],
group_name,
)

if len(self.rate_limited_calls(group_name)) >= self.throttle_max_calls:
on_condition = (
JobException if self.on_condition is None else self.on_condition
)
raise on_condition(
f"Rate limit exceeded, more then {self.throttle_max_calls} calls in {self.throttle_period(group_name)}",
)

# Execute Job
with job.start(on_done=self.sys_jobs.remove_job if self.cleanup else None):
try:
self.set_last_call(datetime.now(), group_name)
if self.rate_limited_calls(group_name) is not None:
self.add_rate_limited_call(
self.last_call(group_name), group_name
try:
# Handle condition
if self.conditions:
try:
await Job.check_conditions(
self, set(self.conditions), self._method.__qualname__
)
except JobConditionException as err:
return self._handle_job_condition_exception(err)

# Handle exection limits
if self.limit in (
JobExecutionLimit.SINGLE_WAIT,
JobExecutionLimit.ONCE,
):
await self._acquire_exection_limit()
elif self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
):
try:
await obj.acquire(
job, self.limit == JobExecutionLimit.GROUP_WAIT
)
except JobGroupExecutionLimitExceeded as err:
if self.on_condition:
raise self.on_condition(str(err)) from err
raise err
elif self.limit in (
JobExecutionLimit.THROTTLE,
JobExecutionLimit.GROUP_THROTTLE,
):
time_since_last_call = datetime.now() - self.last_call(group_name)
if time_since_last_call < self.throttle_period(group_name):
return
elif self.limit in (
JobExecutionLimit.THROTTLE_WAIT,
JobExecutionLimit.GROUP_THROTTLE_WAIT,
):
await self._acquire_exection_limit()
time_since_last_call = datetime.now() - self.last_call(group_name)
if time_since_last_call < self.throttle_period(group_name):
self._release_exception_limits()
return
elif self.limit in (
JobExecutionLimit.THROTTLE_RATE_LIMIT,
JobExecutionLimit.GROUP_THROTTLE_RATE_LIMIT,
):
# Only reprocess array when necessary (at limit)
if (
len(self.rate_limited_calls(group_name))
>= self.throttle_max_calls
):
self.set_rate_limited_calls(
[
call
for call in self.rate_limited_calls(group_name)
if call
> datetime.now() - self.throttle_period(group_name)
],
group_name,
)

return await self._method(obj, *args, **kwargs)

# If a method has a conditional JobCondition, they must check it in the method
# These should be handled like normal JobConditions as much as possible
except JobConditionException as err:
return self._handle_job_condition_exception(err)
except HassioError as err:
raise err
except Exception as err:
_LOGGER.exception("Unhandled exception: %s", err)
capture_exception(err)
raise JobException() from err
finally:
self._release_exception_limits()
if self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
if (
len(self.rate_limited_calls(group_name))
>= self.throttle_max_calls
):
obj.release()
on_condition = (
JobException
if self.on_condition is None
else self.on_condition
)
raise on_condition(
f"Rate limit exceeded, more then {self.throttle_max_calls} calls in {self.throttle_period(group_name)}",
)

# Execute Job
with job.start():
try:
self.set_last_call(datetime.now(), group_name)
if self.rate_limited_calls(group_name) is not None:
self.add_rate_limited_call(
self.last_call(group_name), group_name
)

return await self._method(obj, *args, **kwargs)

# If a method has a conditional JobCondition, they must check it in the method
# These should be handled like normal JobConditions as much as possible
except JobConditionException as err:
return self._handle_job_condition_exception(err)
except HassioError as err:
raise err
except Exception as err:
_LOGGER.exception("Unhandled exception: %s", err)
capture_exception(err)
raise JobException() from err
finally:
self._release_exception_limits()
if self.limit in (
JobExecutionLimit.GROUP_ONCE,
JobExecutionLimit.GROUP_WAIT,
):
obj.release()

# Jobs that weren't started are always cleaned up. Also clean up done jobs if required
finally:
if job.done is None or self.cleanup:
self.sys_jobs.remove_job(job)

return wrapper

Expand Down
58 changes: 58 additions & 0 deletions tests/jobs/test_job_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1044,3 +1044,61 @@ async def job_await_inner(self):
await test.job_await()
await test.job_release()
await task


async def test_job_always_removed_on_check_failure(coresys: CoreSys):
"""Test that the job instance is always removed if the condition or limit check fails."""

class TestClass:
"""Test class."""

event = asyncio.Event()
limit_job: Job | None = None

def __init__(self, coresys: CoreSys) -> None:
"""Initialize object."""
self.coresys = coresys

@Job(
name="test_job_always_removed_on_check_failure_condition",
conditions=[JobCondition.HAOS],
on_condition=JobException,
cleanup=False,
)
async def condition_check(self):
"""Job that will fail a condition check."""
raise AssertionError("should not run")

@Job(
name="test_job_always_removed_on_check_failure_limit",
limit=JobExecutionLimit.ONCE,
cleanup=False,
)
async def limit_check(self):
"""Job that can fail a limit check."""
self.limit_job = self.coresys.jobs.current
await self.event.wait()

def release_limit_check(self):
"""Release the limit check job."""
self.event.set()

test = TestClass(coresys)

with pytest.raises(JobException):
await test.condition_check()
assert coresys.jobs.jobs == []

task = coresys.create_task(test.limit_check())
await asyncio.sleep(0)
assert (job := test.limit_job)

with pytest.raises(JobException):
await test.limit_check()
assert test.limit_job == job
assert coresys.jobs.jobs == [job]

test.release_limit_check()
await task
assert job.done
assert coresys.jobs.jobs == [job]

0 comments on commit ace58ba

Please sign in to comment.