Skip to content

Commit

Permalink
Add policy to cleanup old executors (Issue #7694, PR #7765)
Browse files Browse the repository at this point in the history
# Description

Closes #7694

Add 2 config options:
- `executor_retention`: executors idle for longer than this get cleaned up
- `executor_cap_per_agent`: if this limit is already reached when attempting to create a new executor for this agent, the oldest executor is first stopped.

# Self Check:

Strike through any lines that are not applicable (`~~line~~`) then check the box

- [x] Attached issue to pull request
- [x] Changelog entry
- [x] Type annotations are present
- [x] Code is clear and sufficiently documented
- [x] No (preventable) type errors (check using make mypy or make mypy-diff)
- [x] Sufficient test cases (reproduces the bug/tests the requested feature)
- [x] Correct, in line with design
~~- [ ] End user documentation is included or an issue is created for end-user documentation (add ref to issue here: )~~
~~- [ ] If this PR fixes a race condition in the test suite, also push the fix to the relevant stable branche(s) (see [test-fixes](https://internal.inmanta.com/development/core/tasks/build-master.html#test-fixes) for more info)~~
  • Loading branch information
Hugo-Inmanta authored and inmantaci committed Jul 3, 2024
1 parent 30df9cc commit 04983c4
Show file tree
Hide file tree
Showing 11 changed files with 307 additions and 27 deletions.
4 changes: 4 additions & 0 deletions changelogs/unreleased/7694-executor-cleanup.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
description: Add policy to cleanup old executors
issue-nr: 7694
change-type: patch
destination-branches: [master]
23 changes: 18 additions & 5 deletions src/inmanta/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -639,15 +639,26 @@ def periodic_schedule(
action: Callable[[], Coroutine[object, None, object]],
interval: Union[int, str],
splay_value: int,
initial_time: datetime.datetime,
) -> bool:
"""
Schedule a periodic task
:param kind: Name of the task (value to display in logs)
:param action: The action to schedule periodically
:param interval: The interval at which to schedule the task. Can be specified as either a number of
seconds, or a cron string.
:param splay_value: When specifying the interval as a number of seconds, this parameter specifies
the number of seconds by which to delay the initial execution of this action.
"""
now = datetime.datetime.now().astimezone()

if isinstance(interval, int) and interval > 0:
self.logger.info(
"Scheduling periodic %s with interval %d and splay %d (first run at %s)",
kind,
interval,
splay_value,
(initial_time + datetime.timedelta(seconds=splay_value)).strftime(const.TIME_LOGFMT),
(now + datetime.timedelta(seconds=splay_value)).strftime(const.TIME_LOGFMT),
)
interval_schedule: IntervalSchedule = IntervalSchedule(
interval=float(interval), initial_delay=float(splay_value)
Expand All @@ -674,8 +685,9 @@ def periodic_schedule(
)
)
self.ensure_deploy_on_start = False
periodic_schedule("deploy", deploy_action, self._deploy_interval, self._deploy_splay_value, now)
periodic_schedule("repair", repair_action, self._repair_interval, self._repair_splay_value, now)

periodic_schedule("deploy", deploy_action, self._deploy_interval, self._deploy_splay_value)
periodic_schedule("repair", repair_action, self._repair_interval, self._repair_splay_value)

def _enable_time_trigger(self, action: TaskMethod, schedule: TaskSchedule) -> None:
self.process._sched.add_action(action, schedule)
Expand Down Expand Up @@ -976,7 +988,7 @@ def __init__(

self.agent_map: Optional[dict[str, str]] = agent_map

remote_executor = cfg.agent_executor_mode.get() == cfg.AgentExcutorMode.forking
remote_executor = cfg.agent_executor_mode.get() == cfg.AgentExecutorMode.forking
can_have_remote_executor = code_loader

self.executor_manager: executor.ExecutorManager[executor.Executor]
Expand Down Expand Up @@ -1065,6 +1077,7 @@ async def start(self) -> None:
# cache reference to THIS ioloop for handlers to push requests on it
self._io_loop = asyncio.get_running_loop()
await super().start()
await self.executor_manager.start()

async def add_end_point_name(self, name: str) -> None:
async with self._instances_lock:
Expand Down
28 changes: 23 additions & 5 deletions src/inmanta/agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""

import enum
import functools
import logging
import typing
import uuid
Expand Down Expand Up @@ -149,6 +150,7 @@
is_time,
)


agent_get_resource_backoff: Option[float] = Option(
"config",
"agent-get-resource-backoff",
Expand All @@ -161,26 +163,42 @@
)


class AgentExcutorMode(str, enum.Enum):
class AgentExecutorMode(str, enum.Enum):
threaded = "threaded"
forking = "forking"


def is_executor_mode(value: str | AgentExcutorMode) -> AgentExcutorMode:
def is_executor_mode(value: str | AgentExecutorMode) -> AgentExecutorMode:
"""threaded | forking"""
if isinstance(value, AgentExcutorMode):
if isinstance(value, AgentExecutorMode):
return value
return AgentExcutorMode(value)
return AgentExecutorMode(value)


agent_executor_mode = Option(
"agent",
"executor-mode",
AgentExcutorMode.threaded,
AgentExecutorMode.threaded,
"EXPERIMENTAL: set the agent to use threads or fork subprocesses to create workers.",
is_executor_mode,
)

agent_executor_cap = Option[int](
"agent",
"executor-cap",
3,
"Maximum number of concurrent executors to keep per environment, per agent. If this limit is already reached "
"when creating a new executor, the oldest one will be stopped first.",
is_lower_bounded_int(1),
)

agent_executor_retention_time = Option[int](
"agent",
"executor-retention-time",
60,
"Amount of time (in seconds) to wait before cleaning up inactive executors.",
is_time,
)

##############################
# agent_rest_transport
Expand Down
14 changes: 12 additions & 2 deletions src/inmanta/agent/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,11 +471,14 @@ class ExecutorManager(abc.ABC, typing.Generic[E]):
@abc.abstractmethod
async def get_executor(self, agent_name: str, agent_uri: str, code: typing.Collection[ResourceInstallSpec]) -> E:
"""
Retrieves an Executor based on the agent name and blueprint.
Retrieves an Executor for a given agent with the relevant handler code loaded in its venv.
If an Executor does not exist for the given configuration, a new one is created.
:param agent_name: The name of the agent for which an Executor is being retrieved or created.
:param blueprint: The ExecutorBlueprint defining the configuration for the Executor.
:param agent_uri: The name of the host on which the agent is running.
:param code: Collection of ResourceInstallSpec defining the configuration for the Executor i.e.
which resource types it can act on and all necessary information to install the relevant
handler code in its venv.
:return: An Executor instance
"""
pass
Expand All @@ -491,6 +494,13 @@ async def stop_for_agent(self, agent_name: str) -> list[E]:
"""
pass

@abc.abstractmethod
async def start(self) -> None:
"""
Start the manager.
"""
pass

@abc.abstractmethod
async def stop(self) -> None:
"""
Expand Down
Loading

0 comments on commit 04983c4

Please sign in to comment.