Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue/7694 executor cleanup #7765

Closed
wants to merge 59 commits into from
Closed
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
0d980fd
[WIP]
Hugo-Inmanta Jun 21, 2024
ed30a8c
[WIP]
Hugo-Inmanta Jun 24, 2024
ca300eb
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jun 24, 2024
2578db9
[WIP]
Hugo-Inmanta Jun 24, 2024
996c9a5
[WIP]
Hugo-Inmanta Jun 25, 2024
0531ecb
[WIP]
Hugo-Inmanta Jun 25, 2024
368a2d3
[WIP]
Hugo-Inmanta Jun 25, 2024
77b38ba
[WIP]
Hugo-Inmanta Jun 25, 2024
9ccb204
[WIP]
Hugo-Inmanta Jun 25, 2024
4d633a8
[WIP]
Hugo-Inmanta Jun 26, 2024
f216ec4
[WIP]
Hugo-Inmanta Jun 26, 2024
b120b1a
[WIP]
Hugo-Inmanta Jun 26, 2024
750f0a3
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jun 26, 2024
400f1d7
[WIP]
Hugo-Inmanta Jun 26, 2024
dd1cea8
[WIP]
Hugo-Inmanta Jun 26, 2024
78063a9
[WIP]
Hugo-Inmanta Jun 26, 2024
7181c79
[WIP]
Hugo-Inmanta Jun 26, 2024
9470de1
[WIP]
Hugo-Inmanta Jun 26, 2024
44c2622
[WIP]
Hugo-Inmanta Jun 26, 2024
ef2851f
[WIP]
Hugo-Inmanta Jun 26, 2024
fcd7eb4
[WIP]
Hugo-Inmanta Jun 26, 2024
8170cc7
[WIP]
Hugo-Inmanta Jun 26, 2024
42f2455
[WIP]
Hugo-Inmanta Jun 26, 2024
25c95e0
[WIP]
Hugo-Inmanta Jun 27, 2024
32c98cd
[WIP]
Hugo-Inmanta Jun 27, 2024
8c58031
[WIP]
Hugo-Inmanta Jun 27, 2024
76ee7ac
[WIP]
Hugo-Inmanta Jun 28, 2024
2653273
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jun 28, 2024
2a715fd
[WIP]
Hugo-Inmanta Jun 28, 2024
c9239d9
[WIP]
Hugo-Inmanta Jun 28, 2024
5532ff4
[WIP]
Hugo-Inmanta Jun 28, 2024
33fd080
[WIP]
Hugo-Inmanta Jun 28, 2024
a35b38d
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jun 28, 2024
76a2842
[WIP]
Hugo-Inmanta Jun 28, 2024
1a6682a
[WIP]
Hugo-Inmanta Jul 1, 2024
74c2c50
[WIP]
Hugo-Inmanta Jul 1, 2024
79b04f7
[WIP]
Hugo-Inmanta Jul 2, 2024
ed14455
[WIP]
Hugo-Inmanta Jul 2, 2024
f2e10d0
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jul 2, 2024
9fd9db2
[WIP]
Hugo-Inmanta Jul 2, 2024
61c8aa2
[WIP]
Hugo-Inmanta Jul 2, 2024
f2954c9
[WIP]
Hugo-Inmanta Jul 2, 2024
895b619
[WIP]
Hugo-Inmanta Jul 2, 2024
fffd6a1
[WIP]
Hugo-Inmanta Jul 3, 2024
cf7a1c7
[WIP]
Hugo-Inmanta Jul 3, 2024
a0469d5
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jul 3, 2024
9d4f7f1
[WIP]
Hugo-Inmanta Jul 3, 2024
8d5963a
Update src/inmanta/config.py
Hugo-Inmanta Jul 3, 2024
558cdde
[WIP] increase retention time slightly to avoid unwanted cleanups
Hugo-Inmanta Jul 3, 2024
b536897
[WIP]
Hugo-Inmanta Jul 3, 2024
92f4b90
[WIP]
Hugo-Inmanta Jul 3, 2024
142cfc0
[WIP]
Hugo-Inmanta Jul 3, 2024
08ee9e9
[WIP]
Hugo-Inmanta Jul 3, 2024
025a81b
[WIP]
Hugo-Inmanta Jul 3, 2024
060a005
[WIP]
Hugo-Inmanta Jul 3, 2024
e815992
[WIP]
Hugo-Inmanta Jul 3, 2024
2bedb4a
[WIP]
Hugo-Inmanta Jul 3, 2024
9b7c7b9
[WIP]
Hugo-Inmanta Jul 3, 2024
7cba0da
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jul 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelogs/unreleased/7694-executor-cleanup.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
description: Add policy to cleanup old executors
issue-nr: 7694
change-type: patch
destination-branches: [master]
23 changes: 18 additions & 5 deletions src/inmanta/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,15 +639,26 @@ def periodic_schedule(
action: Callable[[], Coroutine[object, None, object]],
interval: Union[int, str],
splay_value: int,
initial_time: datetime.datetime,
) -> bool:
"""
Schedule a periodic task

:param kind: Name of the task (value to display in logs)
:param action: The action to schedule periodically
:param interval: The interval at which to schedule the task. Can be specified as either a number of
seconds, or a cron string.
:param splay_value: When specifying the interval as a number of seconds, this parameter specifies
the number of seconds by which to delay the initial execution of this action.
"""
now = datetime.datetime.now().astimezone()

if isinstance(interval, int) and interval > 0:
self.logger.info(
"Scheduling periodic %s with interval %d and splay %d (first run at %s)",
kind,
interval,
splay_value,
(initial_time + datetime.timedelta(seconds=splay_value)).strftime(const.TIME_LOGFMT),
(now + datetime.timedelta(seconds=splay_value)).strftime(const.TIME_LOGFMT),
)
interval_schedule: IntervalSchedule = IntervalSchedule(
interval=float(interval), initial_delay=float(splay_value)
Expand All @@ -674,8 +685,9 @@ def periodic_schedule(
)
)
self.ensure_deploy_on_start = False
periodic_schedule("deploy", deploy_action, self._deploy_interval, self._deploy_splay_value, now)
periodic_schedule("repair", repair_action, self._repair_interval, self._repair_splay_value, now)

periodic_schedule("deploy", deploy_action, self._deploy_interval, self._deploy_splay_value)
periodic_schedule("repair", repair_action, self._repair_interval, self._repair_splay_value)

def _enable_time_trigger(self, action: TaskMethod, schedule: TaskSchedule) -> None:
self.process._sched.add_action(action, schedule)
Expand Down Expand Up @@ -976,7 +988,7 @@ def __init__(

self.agent_map: Optional[dict[str, str]] = agent_map

remote_executor = cfg.agent_executor_mode.get() == cfg.AgentExcutorMode.forking
remote_executor = cfg.agent_executor_mode.get() == cfg.AgentExecutorMode.forking
can_have_remote_executor = code_loader

self.executor_manager: executor.ExecutorManager[executor.Executor]
Expand Down Expand Up @@ -1065,6 +1077,7 @@ async def start(self) -> None:
# cache reference to THIS ioloop for handlers to push requests on it
self._io_loop = asyncio.get_running_loop()
await super().start()
await self.executor_manager.start()

async def add_end_point_name(self, name: str) -> None:
async with self._instances_lock:
Expand Down
28 changes: 23 additions & 5 deletions src/inmanta/agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""

import enum
import functools
import logging
import typing
import uuid
Expand Down Expand Up @@ -149,6 +150,7 @@
is_time,
)


agent_get_resource_backoff: Option[float] = Option(
"config",
"agent-get-resource-backoff",
Expand All @@ -161,26 +163,42 @@
)


class AgentExcutorMode(str, enum.Enum):
class AgentExecutorMode(str, enum.Enum):
threaded = "threaded"
forking = "forking"


def is_executor_mode(value: str | AgentExcutorMode) -> AgentExcutorMode:
def is_executor_mode(value: str | AgentExecutorMode) -> AgentExecutorMode:
"""threaded | forking"""
if isinstance(value, AgentExcutorMode):
if isinstance(value, AgentExecutorMode):
return value
return AgentExcutorMode(value)
return AgentExecutorMode(value)


agent_executor_mode = Option(
"agent",
"executor-mode",
AgentExcutorMode.threaded,
AgentExecutorMode.threaded,
"EXPERIMENTAL: set the agent to use threads or fork subprocesses to create workers.",
is_executor_mode,
)

agent_executor_cap = Option[int](
"agent",
"executor-cap",
3,
"Maximum number of concurrent executors to keep per environment, per agent. If this limit is already reached "
"when creating a new executor, the oldest one will be stopped first.",
is_lower_bounded_int(1),
)

agent_executor_retention_time = Option[int](
"agent",
"executor-retention-time",
60,
"Amount of time (in seconds) to wait before cleaning up inactive executors.",
is_time,
)

##############################
# agent_rest_transport
Expand Down
14 changes: 12 additions & 2 deletions src/inmanta/agent/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,11 +471,14 @@ class ExecutorManager(abc.ABC, typing.Generic[E]):
@abc.abstractmethod
async def get_executor(self, agent_name: str, agent_uri: str, code: typing.Collection[ResourceInstallSpec]) -> E:
"""
Retrieves an Executor based on the agent name and blueprint.
Retrieves an Executor for a given agent with the relevant handler code loaded in its venv.
If an Executor does not exist for the given configuration, a new one is created.

:param agent_name: The name of the agent for which an Executor is being retrieved or created.
:param blueprint: The ExecutorBlueprint defining the configuration for the Executor.
:param agent_uri: The name of the host on which the agent is running.
:param code: Collection of ResourceInstallSpec defining the configuration for the Executor i.e.
which resource types it can act on and all necessary information to install the relevant
handler code in its venv.
:return: An Executor instance
"""
pass
Expand All @@ -491,6 +494,13 @@ async def stop_for_agent(self, agent_name: str) -> list[E]:
"""
pass

@abc.abstractmethod
async def start(self) -> None:
"""
Start the manager.
"""
pass

@abc.abstractmethod
async def stop(self) -> None:
"""
Expand Down
Loading