Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue/7694 executor cleanup #7765

Closed
wants to merge 59 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
0d980fd
[WIP]
Hugo-Inmanta Jun 21, 2024
ed30a8c
[WIP]
Hugo-Inmanta Jun 24, 2024
ca300eb
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jun 24, 2024
2578db9
[WIP]
Hugo-Inmanta Jun 24, 2024
996c9a5
[WIP]
Hugo-Inmanta Jun 25, 2024
0531ecb
[WIP]
Hugo-Inmanta Jun 25, 2024
368a2d3
[WIP]
Hugo-Inmanta Jun 25, 2024
77b38ba
[WIP]
Hugo-Inmanta Jun 25, 2024
9ccb204
[WIP]
Hugo-Inmanta Jun 25, 2024
4d633a8
[WIP]
Hugo-Inmanta Jun 26, 2024
f216ec4
[WIP]
Hugo-Inmanta Jun 26, 2024
b120b1a
[WIP]
Hugo-Inmanta Jun 26, 2024
750f0a3
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jun 26, 2024
400f1d7
[WIP]
Hugo-Inmanta Jun 26, 2024
dd1cea8
[WIP]
Hugo-Inmanta Jun 26, 2024
78063a9
[WIP]
Hugo-Inmanta Jun 26, 2024
7181c79
[WIP]
Hugo-Inmanta Jun 26, 2024
9470de1
[WIP]
Hugo-Inmanta Jun 26, 2024
44c2622
[WIP]
Hugo-Inmanta Jun 26, 2024
ef2851f
[WIP]
Hugo-Inmanta Jun 26, 2024
fcd7eb4
[WIP]
Hugo-Inmanta Jun 26, 2024
8170cc7
[WIP]
Hugo-Inmanta Jun 26, 2024
42f2455
[WIP]
Hugo-Inmanta Jun 26, 2024
25c95e0
[WIP]
Hugo-Inmanta Jun 27, 2024
32c98cd
[WIP]
Hugo-Inmanta Jun 27, 2024
8c58031
[WIP]
Hugo-Inmanta Jun 27, 2024
76ee7ac
[WIP]
Hugo-Inmanta Jun 28, 2024
2653273
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jun 28, 2024
2a715fd
[WIP]
Hugo-Inmanta Jun 28, 2024
c9239d9
[WIP]
Hugo-Inmanta Jun 28, 2024
5532ff4
[WIP]
Hugo-Inmanta Jun 28, 2024
33fd080
[WIP]
Hugo-Inmanta Jun 28, 2024
a35b38d
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jun 28, 2024
76a2842
[WIP]
Hugo-Inmanta Jun 28, 2024
1a6682a
[WIP]
Hugo-Inmanta Jul 1, 2024
74c2c50
[WIP]
Hugo-Inmanta Jul 1, 2024
79b04f7
[WIP]
Hugo-Inmanta Jul 2, 2024
ed14455
[WIP]
Hugo-Inmanta Jul 2, 2024
f2e10d0
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jul 2, 2024
9fd9db2
[WIP]
Hugo-Inmanta Jul 2, 2024
61c8aa2
[WIP]
Hugo-Inmanta Jul 2, 2024
f2954c9
[WIP]
Hugo-Inmanta Jul 2, 2024
895b619
[WIP]
Hugo-Inmanta Jul 2, 2024
fffd6a1
[WIP]
Hugo-Inmanta Jul 3, 2024
cf7a1c7
[WIP]
Hugo-Inmanta Jul 3, 2024
a0469d5
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jul 3, 2024
9d4f7f1
[WIP]
Hugo-Inmanta Jul 3, 2024
8d5963a
Update src/inmanta/config.py
Hugo-Inmanta Jul 3, 2024
558cdde
[WIP] increase retention time slightly to avoid unwanted cleanups
Hugo-Inmanta Jul 3, 2024
b536897
[WIP]
Hugo-Inmanta Jul 3, 2024
92f4b90
[WIP]
Hugo-Inmanta Jul 3, 2024
142cfc0
[WIP]
Hugo-Inmanta Jul 3, 2024
08ee9e9
[WIP]
Hugo-Inmanta Jul 3, 2024
025a81b
[WIP]
Hugo-Inmanta Jul 3, 2024
060a005
[WIP]
Hugo-Inmanta Jul 3, 2024
e815992
[WIP]
Hugo-Inmanta Jul 3, 2024
2bedb4a
[WIP]
Hugo-Inmanta Jul 3, 2024
9b7c7b9
[WIP]
Hugo-Inmanta Jul 3, 2024
7cba0da
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jul 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/inmanta/agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""

import enum
import functools
import logging
import typing
import uuid
Expand Down Expand Up @@ -188,7 +189,7 @@ def is_executor_mode(value: str | AgentExecutorMode) -> AgentExecutorMode:
3,
"Maximum number of concurrent executors to keep per environment, per agent. If this limit is already reached "
"when creating a new executor, the oldest one will be stopped first.",
is_int,
functools.partial(is_lower_bounded_int, 1),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use the docstring of the function to generate documentation, so this won't look so pretty in the docs

)

agent_executor_retention_time = Option[int](
Expand Down
76 changes: 8 additions & 68 deletions src/inmanta/agent/forking_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import uuid
from abc import abstractmethod
from asyncio import Future, transports
from typing import Any, Coroutine, Optional, Sequence
from typing import Optional, Sequence

import inmanta.agent.cache
import inmanta.agent.executor
Expand Down Expand Up @@ -606,13 +606,9 @@ def __init__(
self.max_executors_per_agent = inmanta.agent.config.agent_executor_cap.get()

self.stopping: bool = False
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've already given more comment than I'm comfortable, so feel free to ignore this one: when used like this, I usually call this 'running' (and invert it) to make sure it doesn't only indicate 'stopping' but also 'stopped'

# Keep a reference to all active backgrounds tasks to make sure they don't disappear
# mid-execution and to be able to cancel them when shutting down.
self.background_tasks: set[asyncio.Task[None]] = set()

# Keep a reference to the last scheduled cleanup task for easy cancelation
# and rescheduling when the retention time is updated
self.next_executor_cleanup_task: Optional[asyncio.Task[None]] = None
# We keep a reference to the periodic cleanup task to prevent it
# from disappearing mid-execution https://docs.python.org/3.11/library/asyncio-task.html#creating-tasks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, nice!

self.cleanup_job: Optional[asyncio.Task[None]] = None

def __add_executor(self, theid: executor.ExecutorId, the_executor: MPExecutor) -> None:
self.executor_map[theid] = the_executor
Expand Down Expand Up @@ -784,13 +780,10 @@ def _make_child(self, name: str, log_level: int, cli_log: bool) -> tuple[multipr
return p, parent_conn

async def start(self) -> None:
self.start_executor_retention_time_watcher()
self.start_periodic_executor_cleanup()
self.cleanup_job = asyncio.create_task(self.cleanup_inactive_executors())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the join method, you could await self.cleanup_job to make sure it is down.


async def stop(self) -> None:
self.stopping = True
for task in self.background_tasks:
task.cancel()
await asyncio.gather(*(child.stop() for child in self.children))

async def force_stop(self, grace_time: float) -> None:
Expand All @@ -806,49 +799,11 @@ async def stop_for_agent(self, agent_name: str) -> list[MPExecutor]:
await asyncio.gather(*(child.stop() for child in children))
return children

def start_executor_retention_time_watcher(self) -> None:
"""
Schedule a task that periodically checks for updates to the executor
retention time. If such an update is detected, the previous recurring cleanup
task is canceled and a new recurring cleanup task with the updated retention time
is scheduled.
"""

async def watch_retention_time() -> None:
if self.stopping:
# Stop periodic re-scheduling when the manager is shutting down
return

retention_time_from_config = inmanta.agent.config.agent_executor_retention_time.get()

if self.executor_retention_time != retention_time_from_config:
self.executor_retention_time = retention_time_from_config
self.start_periodic_executor_cleanup(restart=True)

await asyncio.sleep(2)
self.add_background_task(watch_retention_time())

self.add_background_task(watch_retention_time())

def start_periodic_executor_cleanup(self, restart: Optional[bool] = False) -> None:
async def cleanup_inactive_executors(self) -> None:
"""
Schedule a task that removes inactive executors and re-schedules
itself based on the retention time.

:param restart: Cancel the previously scheduled cleanup task and start a new one.
This task periodically cleans up idle executors
"""

async def cleanup_inactive_executors() -> None:
"""
This task cleans up idle executors and reschedules itself
"""
if self.stopping:
# Stop periodic re-scheduling when the manager is shutting down
return

if restart and self.next_executor_cleanup_task:
self.next_executor_cleanup_task.cancel()

while not self.stopping:
cleanup_start = datetime.datetime.now().astimezone()

reschedule_interval: float = self.executor_retention_time
Expand Down Expand Up @@ -884,18 +839,3 @@ async def cleanup_inactive_executors() -> None:
cleanup_end = datetime.datetime.now().astimezone()

await asyncio.sleep(max(0.0, reschedule_interval - (cleanup_end - cleanup_start).total_seconds()))
self.next_executor_cleanup_task = self.add_background_task(cleanup_inactive_executors())

self.add_background_task(cleanup_inactive_executors())

def add_background_task(self, coro: Coroutine[Any, Any, None]) -> asyncio.Task[None]:
"""
Wrap a coroutine in an asyncio Task and register it.
"""
task = asyncio.create_task(coro)

self.background_tasks.add(task)
# Make sure the task removes itself from the set when it's done
task.add_done_callback(self.background_tasks.discard)

return task
8 changes: 8 additions & 0 deletions src/inmanta/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ def is_list(value: str | list[str]) -> list[str]:
return [] if value == "" else [x.strip() for x in value.split(",")]


def is_lower_bounded_int(lower_bound: int, value: str | int) -> int:
"""lower-bounded int"""
to_int = int(value)
if to_int < lower_bound:
raise ValueError(f"Value can not be lower than {lower_bound}")
return to_int
Hugo-Inmanta marked this conversation as resolved.
Show resolved Hide resolved


def is_map(map_in: str | typing.Mapping[str, str]) -> typing.Mapping[str, str]:
"""List of comma-separated key=value pairs"""
if isinstance(map_in, typing.Mapping):
Expand Down
9 changes: 2 additions & 7 deletions tests/forking_agent/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ def set_custom_executor_policy():
inmanta.agent.config.agent_executor_cap.set("2")

old_retention_value = inmanta.agent.config.agent_executor_retention_time.get()
# Clean up executors after 20s of inactivity
inmanta.agent.config.agent_executor_retention_time.set("20")
# Clean up executors after 2s of inactivity
inmanta.agent.config.agent_executor_retention_time.set("2")

yield

Expand All @@ -108,7 +108,6 @@ async def test_executor_server(set_custom_executor_policy, mpmanager: MPManager,
Also test that an executor policy can be set:
- the agent_executor_cap option correctly stops the oldest executor.
- the agent_executor_retention_time option is used to clean up old executors.
- updating the agent_executor_retention_time option correctly re-schedules the cleanup
"""

with pytest.raises(ImportError):
Expand Down Expand Up @@ -215,10 +214,6 @@ def test():
import lorem # noqa: F401, F811

async def check_automatic_clean_up() -> bool:
# Retention time is set to 20 prior to this
# Setting it to 2 tests that a new periodic cleanup is schedule when
# the setting is updated
inmanta.agent.config.agent_executor_retention_time.set("2")
return len(manager.agent_map["agent2"]) == 0

with caplog.at_level(logging.DEBUG):
Expand Down
17 changes: 17 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
Contact: [email protected]
"""

import functools
import logging
import os
import random
import socket

import netifaces
import pytest
from tornado import netutil

import inmanta.agent.config as cfg
Expand Down Expand Up @@ -347,3 +349,18 @@ def test_option_is_list_empty():
option: Option = Option("test", "list", "default,values", "documentation", cfg.is_list)
option.set("")
assert option.get() == []


def test_option_is_lower_bounded_int():
lower_bound = 1
option: Option = Option(
"test", "lb_int", lower_bound, "documentation", functools.partial(cfg.is_lower_bounded_int, lower_bound)
)
option.set("2")
assert option.get() == 2

option.set("0")
with pytest.raises(ValueError) as exc_info:
option.get()

assert f"Value can not be lower than {lower_bound}" in str(exc_info.value)