Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue/7694 executor cleanup #7765

Closed
wants to merge 59 commits into from
Closed
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
0d980fd
[WIP]
Hugo-Inmanta Jun 21, 2024
ed30a8c
[WIP]
Hugo-Inmanta Jun 24, 2024
ca300eb
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jun 24, 2024
2578db9
[WIP]
Hugo-Inmanta Jun 24, 2024
996c9a5
[WIP]
Hugo-Inmanta Jun 25, 2024
0531ecb
[WIP]
Hugo-Inmanta Jun 25, 2024
368a2d3
[WIP]
Hugo-Inmanta Jun 25, 2024
77b38ba
[WIP]
Hugo-Inmanta Jun 25, 2024
9ccb204
[WIP]
Hugo-Inmanta Jun 25, 2024
4d633a8
[WIP]
Hugo-Inmanta Jun 26, 2024
f216ec4
[WIP]
Hugo-Inmanta Jun 26, 2024
b120b1a
[WIP]
Hugo-Inmanta Jun 26, 2024
750f0a3
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jun 26, 2024
400f1d7
[WIP]
Hugo-Inmanta Jun 26, 2024
dd1cea8
[WIP]
Hugo-Inmanta Jun 26, 2024
78063a9
[WIP]
Hugo-Inmanta Jun 26, 2024
7181c79
[WIP]
Hugo-Inmanta Jun 26, 2024
9470de1
[WIP]
Hugo-Inmanta Jun 26, 2024
44c2622
[WIP]
Hugo-Inmanta Jun 26, 2024
ef2851f
[WIP]
Hugo-Inmanta Jun 26, 2024
fcd7eb4
[WIP]
Hugo-Inmanta Jun 26, 2024
8170cc7
[WIP]
Hugo-Inmanta Jun 26, 2024
42f2455
[WIP]
Hugo-Inmanta Jun 26, 2024
25c95e0
[WIP]
Hugo-Inmanta Jun 27, 2024
32c98cd
[WIP]
Hugo-Inmanta Jun 27, 2024
8c58031
[WIP]
Hugo-Inmanta Jun 27, 2024
76ee7ac
[WIP]
Hugo-Inmanta Jun 28, 2024
2653273
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jun 28, 2024
2a715fd
[WIP]
Hugo-Inmanta Jun 28, 2024
c9239d9
[WIP]
Hugo-Inmanta Jun 28, 2024
5532ff4
[WIP]
Hugo-Inmanta Jun 28, 2024
33fd080
[WIP]
Hugo-Inmanta Jun 28, 2024
a35b38d
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jun 28, 2024
76a2842
[WIP]
Hugo-Inmanta Jun 28, 2024
1a6682a
[WIP]
Hugo-Inmanta Jul 1, 2024
74c2c50
[WIP]
Hugo-Inmanta Jul 1, 2024
79b04f7
[WIP]
Hugo-Inmanta Jul 2, 2024
ed14455
[WIP]
Hugo-Inmanta Jul 2, 2024
f2e10d0
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jul 2, 2024
9fd9db2
[WIP]
Hugo-Inmanta Jul 2, 2024
61c8aa2
[WIP]
Hugo-Inmanta Jul 2, 2024
f2954c9
[WIP]
Hugo-Inmanta Jul 2, 2024
895b619
[WIP]
Hugo-Inmanta Jul 2, 2024
fffd6a1
[WIP]
Hugo-Inmanta Jul 3, 2024
cf7a1c7
[WIP]
Hugo-Inmanta Jul 3, 2024
a0469d5
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jul 3, 2024
9d4f7f1
[WIP]
Hugo-Inmanta Jul 3, 2024
8d5963a
Update src/inmanta/config.py
Hugo-Inmanta Jul 3, 2024
558cdde
[WIP] increase retention time slightly to avoid unwanted cleanups
Hugo-Inmanta Jul 3, 2024
b536897
[WIP]
Hugo-Inmanta Jul 3, 2024
92f4b90
[WIP]
Hugo-Inmanta Jul 3, 2024
142cfc0
[WIP]
Hugo-Inmanta Jul 3, 2024
08ee9e9
[WIP]
Hugo-Inmanta Jul 3, 2024
025a81b
[WIP]
Hugo-Inmanta Jul 3, 2024
060a005
[WIP]
Hugo-Inmanta Jul 3, 2024
e815992
[WIP]
Hugo-Inmanta Jul 3, 2024
2bedb4a
[WIP]
Hugo-Inmanta Jul 3, 2024
9b7c7b9
[WIP]
Hugo-Inmanta Jul 3, 2024
7cba0da
Merge branch 'master' into issue/7694-executor-cleanup
Hugo-Inmanta Jul 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/inmanta/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1077,6 +1077,7 @@ async def start(self) -> None:
# cache reference to THIS ioloop for handlers to push requests on it
self._io_loop = asyncio.get_running_loop()
await super().start()
await self.executor_manager.start()

async def add_end_point_name(self, name: str) -> None:
async with self._instances_lock:
Expand Down
3 changes: 2 additions & 1 deletion src/inmanta/agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""

import enum
import functools
import logging
import typing
import uuid
Expand Down Expand Up @@ -188,7 +189,7 @@ def is_executor_mode(value: str | AgentExecutorMode) -> AgentExecutorMode:
3,
"Maximum number of concurrent executors to keep per environment, per agent. If this limit is already reached "
"when creating a new executor, the oldest one will be stopped first.",
is_int,
is_lower_bounded_int(1),
)

agent_executor_retention_time = Option[int](
Expand Down
7 changes: 7 additions & 0 deletions src/inmanta/agent/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,13 @@ async def stop_for_agent(self, agent_name: str) -> list[E]:
"""
pass

@abc.abstractmethod
async def start(self) -> None:
"""
Start the manager.
"""
pass

@abc.abstractmethod
async def stop(self) -> None:
"""
Expand Down
113 changes: 82 additions & 31 deletions src/inmanta/agent/forking_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
Contact: [email protected]
"""

import abc
import asyncio
import collections
import concurrent.futures
Expand All @@ -29,8 +30,9 @@
import socket
import typing
import uuid
from abc import abstractmethod
from asyncio import Future, transports
from datetime import timedelta
from typing import Optional, Sequence

import inmanta.agent.cache
import inmanta.agent.executor
Expand Down Expand Up @@ -175,7 +177,7 @@ def __init__(self, name: str):
# Keeps track of when this client was active last
self.last_used_at: datetime.datetime = datetime.datetime.now().astimezone()

def get_idle_time(self) -> timedelta:
def get_idle_time(self) -> datetime.timedelta:
return datetime.datetime.now().astimezone() - self.last_used_at

@typing.overload
Expand All @@ -201,8 +203,8 @@ def has_outstanding_calls(self) -> bool:
return len(self.requests) > 0

def process_reply(self, frame: IPCReplyFrame) -> None:
self.last_used_at = datetime.datetime.now().astimezone()
super().process_reply(frame)
self.last_used_at = datetime.datetime.now().astimezone()


class StopCommand(inmanta.protocol.ipc_light.IPCMethod[ExecutorContext, None]):
Expand Down Expand Up @@ -397,7 +399,21 @@ async def serve() -> None:
exit(0)


class MPExecutor(executor.Executor):
class PoolMember(abc.ABC):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice! Could use a docstring though.

@abstractmethod
def can_be_cleaned_up(self, retention_time: int) -> bool:
pass

@abstractmethod
def last_used(self) -> datetime.datetime:
pass

@abstractmethod
async def stop(self) -> None:
pass


class MPExecutor(executor.Executor, PoolMember):
"""A Single Child Executor"""

def __init__(
Expand Down Expand Up @@ -435,7 +451,10 @@ def can_be_cleaned_up(self, retention_time: int) -> bool:
if self.connection.has_outstanding_calls():
return False

return self.connection.get_idle_time() > timedelta(seconds=retention_time)
return self.connection.get_idle_time() > datetime.timedelta(seconds=retention_time)

def last_used(self) -> datetime.datetime:
return self.connection.last_used_at

async def force_stop(self, grace_time: float = inmanta.const.SHUTDOWN_GRACE_HARD) -> None:
"""Stop by process close"""
Expand Down Expand Up @@ -518,7 +537,24 @@ async def get_facts(self, resource: "inmanta.agent.executor.ResourceDetails") ->
return await self.connection.call(FactsCommand(resource))


class MPManager(executor.ExecutorManager[MPExecutor]):
class PoolManager:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class PoolManager:
class PoolManager(abc.ABC):

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be honest, I don't see the benefit of this class, or even the PoolMember class, the way they're implemented now. Sure, they present a common interface, but it doesn't seem like we use the interface in any meaningful way?

  • the get_pool_members method is not used
  • neither class offers any funcionality, meaning the cleanup methods are still custom

Am I missing something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, this is not useful right now, I'll remove it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not entirely what I'm trying to say. I think the concept is very good, if it allows us to share behavior between both use cases. So when @wouterdb proposed it, I'm assuming that's what he had in mind? By removing it, we're back to square one, aren't we?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll leave them out for now, see discussion on slack. I think it was intended for @hlloreda to factor out if necessary all along

def __init__(self, max_time: int, max_pool_size: int, min_pool_size: int):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def __init__(self, max_time: int, max_pool_size: int, min_pool_size: int):
def __init__(self, *, max_time: int, max_pool_size: int, min_pool_size: int):

pass

@abstractmethod
async def start(self) -> None:
pass

@abstractmethod
async def stop(self) -> None:
pass

@abstractmethod
def get_pool_members(self) -> Sequence[PoolMember]:
pass


class MPManager(executor.ExecutorManager[MPExecutor], PoolManager):
"""
This is the executor that provides the new behavior (ISO8+),
where the agent forks executors in specific venvs to prevent code reloading.
Expand Down Expand Up @@ -569,7 +605,9 @@ def __init__(
self.executor_retention_time = inmanta.agent.config.agent_executor_retention_time.get()
self.max_executors_per_agent = inmanta.agent.config.agent_executor_cap.get()

self.start_periodic_executor_cleanup()
# We keep a reference to the periodic cleanup task to prevent it
# from disappearing mid-execution https://docs.python.org/3.11/library/asyncio-task.html#creating-tasks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, nice!

self.cleanup_job: Optional[asyncio.Task[None]] = None

def __add_executor(self, theid: executor.ExecutorId, the_executor: MPExecutor) -> None:
self.executor_map[theid] = the_executor
Expand All @@ -588,6 +626,9 @@ def __remove_executor(self, the_executor: MPExecutor) -> None:
self.executor_map.pop(theid)
self.agent_map[theid.agent_name].discard(theid)

def get_pool_members(self) -> Sequence[MPExecutor]:
return self.children

@classmethod
def init_once(cls) -> None:
try:
Expand Down Expand Up @@ -737,7 +778,12 @@ def _make_child(self, name: str, log_level: int, cli_log: bool) -> tuple[multipr

return p, parent_conn

async def start(self) -> None:
self.running = True
self.cleanup_job = asyncio.create_task(self.cleanup_inactive_executors())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the join method, you could await self.cleanup_job to make sure it is down.


async def stop(self) -> None:
self.running = False
await asyncio.gather(*(child.stop() for child in self.children))

async def force_stop(self, grace_time: float) -> None:
Expand All @@ -746,47 +792,52 @@ async def force_stop(self, grace_time: float) -> None:
async def join(self, thread_pool_finalizer: list[concurrent.futures.ThreadPoolExecutor], timeout: float) -> None:
thread_pool_finalizer.append(self.thread_pool)
await asyncio.gather(*(child.join(timeout) for child in self.children))
if self.cleanup_job:
await self.cleanup_job

async def stop_for_agent(self, agent_name: str) -> list[MPExecutor]:
children_ids = self.agent_map[agent_name]
children = [self.executor_map[child_id] for child_id in children_ids]
await asyncio.gather(*(child.stop() for child in children))
return children

def start_periodic_executor_cleanup(self) -> None:
async def cleanup_inactive_executors() -> None:
"""
This task cleans up idle executors and reschedules itself
"""
async def cleanup_inactive_executors(self) -> None:
"""
This task periodically cleans up idle executors
"""
while self.running:
cleanup_start = datetime.datetime.now().astimezone()

now = datetime.datetime.now().astimezone()
reschedule_interval: float = self.executor_retention_time
for _executor in self.executor_map.values():
if _executor.can_be_cleaned_up(self.executor_retention_time):
LOGGER.debug(
"Stopping executor %s because it "
"was inactive for %d s, which is longer then the retention time of %d s.",
_executor.executor_id.identity(),
_executor.connection.get_idle_time().total_seconds(),
self.executor_retention_time,
)
async with self._locks.get(_executor.executor_id.identity()):
sanderr marked this conversation as resolved.
Show resolved Hide resolved
try:
await _executor.stop()
except Exception:
LOGGER.debug(
"Unexpected error during executor %s cleanup:", _executor.executor_id.identity(), exc_info=True
)
# Check that the executor can still be cleaned up by the time we have acquired the lock
if _executor.can_be_cleaned_up(self.executor_retention_time):
try:
LOGGER.debug(
"Stopping executor %s because it "
"was inactive for %d s, which is longer then the retention time of %d s.",
_executor.executor_id.identity(),
_executor.connection.get_idle_time().total_seconds(),
self.executor_retention_time,
)
await _executor.stop()
except Exception:
LOGGER.debug(
"Unexpected error during executor %s cleanup:",
_executor.executor_id.identity(),
exc_info=True,
)
else:
reschedule_interval = min(
reschedule_interval,
(
now - _executor.connection.last_used_at + timedelta(seconds=self.executor_retention_time)
datetime.timedelta(seconds=self.executor_retention_time)
- (cleanup_start - _executor.connection.last_used_at)
).total_seconds(),
)

LOGGER.debug(f"Scheduling next cleanup_inactive_executors in {reschedule_interval} s.")
await asyncio.sleep(reschedule_interval)
asyncio.create_task(cleanup_inactive_executors())
cleanup_end = datetime.datetime.now().astimezone()

asyncio.ensure_future(cleanup_inactive_executors())
await asyncio.sleep(max(0.0, reschedule_interval - (cleanup_end - cleanup_start).total_seconds()))
3 changes: 3 additions & 0 deletions src/inmanta/agent/in_process_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,9 @@ async def stop(self) -> None:
for child in self.executors.values():
child.stop()

async def start(self) -> None:
pass

async def stop_for_agent(self, agent_name: str) -> list[InProcessExecutor]:
if agent_name in self.executors:
out = self.executors[agent_name]
Expand Down
13 changes: 13 additions & 0 deletions src/inmanta/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,19 @@ def is_list(value: str | list[str]) -> list[str]:
return [] if value == "" else [x.strip() for x in value.split(",")]


def is_lower_bounded_int(lower_bound: int) -> Callable[[str | int], int]:
"""lower-bounded int factory"""

def inner(value: str | int) -> int:
to_int = int(value)
if to_int < lower_bound:
raise ValueError(f"Value can not be lower than {lower_bound}")
return to_int

inner.__doc__ = f"int >= {lower_bound}"
return inner


def is_map(map_in: str | typing.Mapping[str, str]) -> typing.Mapping[str, str]:
"""List of comma-separated key=value pairs"""
if isinstance(map_in, typing.Mapping):
Expand Down
6 changes: 4 additions & 2 deletions tests/forking_agent/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ def set_custom_executor_policy():
inmanta.agent.config.agent_executor_cap.set("2")

old_retention_value = inmanta.agent.config.agent_executor_retention_time.get()
# Clean up executors after 2s of inactivity
inmanta.agent.config.agent_executor_retention_time.set("2")
# Clean up executors after 3s of inactivity
inmanta.agent.config.agent_executor_retention_time.set("3")
Comment on lines +89 to +90
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increased this a bit because it was causing intempestive cleanup in the test case: the old executor is supposed to get cleaned up when the agent cap is reached, but with this setting too low, it would automatically get cleaned up before then.


yield

Expand Down Expand Up @@ -115,6 +115,8 @@ async def test_executor_server(set_custom_executor_policy, mpmanager: MPManager,
import lorem # noqa: F401

manager = mpmanager
await manager.start()

inmanta.config.Config.set("test", "aaa", "bbbb")

# Simple empty venv
Expand Down
17 changes: 17 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
Contact: [email protected]
"""

import functools
import logging
import os
import random
import socket

import netifaces
import pytest
from tornado import netutil

import inmanta.agent.config as cfg
Expand Down Expand Up @@ -347,3 +349,18 @@ def test_option_is_list_empty():
option: Option = Option("test", "list", "default,values", "documentation", cfg.is_list)
option.set("")
assert option.get() == []


def test_option_is_lower_bounded_int():
lower_bound = 1
option: Option = Option(
"test", "lb_int", lower_bound, "documentation", functools.partial(cfg.is_lower_bounded_int, lower_bound)
)
option.set("2")
assert option.get() == 2

option.set("0")
with pytest.raises(ValueError) as exc_info:
option.get()

assert f"Value can not be lower than {lower_bound}" in str(exc_info.value)